Skip to content

Commit 64ce6fd

Browse files
authored
🐛Destination S3 and GCS - Fixed connector's bug that prevent writing streams with more than 50GB (#5890)
Co-authored-by: ievgeniit <ievgeniit>
1 parent 69b2453 commit 64ce6fd

File tree

32 files changed

+938
-17
lines changed

32 files changed

+938
-17
lines changed

airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/4816b78f-1489-44c1-9060-4b19d5fa9362.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,6 @@
22
"destinationDefinitionId": "4816b78f-1489-44c1-9060-4b19d5fa9362",
33
"name": "S3",
44
"dockerRepository": "airbyte/destination-s3",
5-
"dockerImageTag": "0.1.11",
5+
"dockerImageTag": "0.1.12",
66
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/s3"
77
}

airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/ca8f6566-e555-4b40-943a-545bf123117a.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,6 @@
22
"destinationDefinitionId": "ca8f6566-e555-4b40-943a-545bf123117a",
33
"name": "Google Cloud Storage (GCS)",
44
"dockerRepository": "airbyte/destination-gcs",
5-
"dockerImageTag": "0.1.0",
5+
"dockerImageTag": "0.1.2",
66
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/gcs"
77
}

airbyte-config/init/src/main/resources/seed/destination_definitions.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232
- destinationDefinitionId: ca8f6566-e555-4b40-943a-545bf123117a
3333
name: Google Cloud Storage (GCS)
3434
dockerRepository: airbyte/destination-gcs
35-
dockerImageTag: 0.1.0
35+
dockerImageTag: 0.1.2
3636
documentationUrl: https://docs.airbyte.io/integrations/destinations/gcs
3737
- destinationDefinitionId: 356668e2-7e34-47f3-a3b0-67a8a481b692
3838
name: Google PubSub
@@ -47,7 +47,7 @@
4747
- destinationDefinitionId: 4816b78f-1489-44c1-9060-4b19d5fa9362
4848
name: S3
4949
dockerRepository: airbyte/destination-s3
50-
dockerImageTag: 0.1.11
50+
dockerImageTag: 0.1.12
5151
documentationUrl: https://docs.airbyte.io/integrations/destinations/s3
5252
- destinationDefinitionId: f7a7d195-377f-cf5b-70a5-be6b819019dc
5353
name: Redshift

airbyte-integrations/bases/standard-destination-test/src/main/java/io/airbyte/integrations/standardtest/destination/DestinationAcceptanceTest.java

Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,8 @@
5656
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
5757
import io.airbyte.protocol.models.ConnectorSpecification;
5858
import io.airbyte.protocol.models.DestinationSyncMode;
59+
import io.airbyte.protocol.models.Field;
60+
import io.airbyte.protocol.models.JsonSchemaPrimitive;
5961
import io.airbyte.protocol.models.SyncMode;
6062
import io.airbyte.workers.DbtTransformationRunner;
6163
import io.airbyte.workers.DefaultCheckConnectionWorker;
@@ -82,9 +84,12 @@
8284
import java.util.Map.Entry;
8385
import java.util.Random;
8486
import java.util.UUID;
87+
import java.util.concurrent.atomic.AtomicInteger;
8588
import java.util.stream.Collectors;
89+
import org.joda.time.DateTime;
8690
import org.junit.jupiter.api.AfterEach;
8791
import org.junit.jupiter.api.BeforeEach;
92+
import org.junit.jupiter.api.Disabled;
8893
import org.junit.jupiter.api.Test;
8994
import org.junit.jupiter.params.ParameterizedTest;
9095
import org.junit.jupiter.params.provider.ArgumentsSource;
@@ -1155,4 +1160,125 @@ public String toString() {
11551160

11561161
}
11571162

1163+
/**
1164+
* This test MUST be disabled by default, but you may uncomment it and use when need to reproduce a
1165+
* performance issue for destination. This test helps you to emulate lot's of stream and messages in
1166+
* each simply changing the "streamsSize" args to set a number of tables\streams and the
1167+
* "messagesNumber" to a messages number that would be written in each stream. !!! Do NOT forget to
1168+
* manually remove all generated objects !!! Hint: To check the destination container output run
1169+
* "docker ps" command in console to find the container's id. Then run "docker container attach
1170+
* your_containers_id" (ex. docker container attach 18cc929f44c8) to see the container's output
1171+
*/
1172+
@Test
1173+
@Disabled
1174+
public void testStressPerformance() throws Exception {
1175+
final int streamsSize = 5; // number of generated streams
1176+
final int messagesNumber = 300; // number of msg to be written to each generated stream
1177+
1178+
// Each stream will have an id and name fields
1179+
final String USERS_STREAM_NAME = "users"; // stream's name prefix. Will get "user0", "user1", etc.
1180+
final String ID = "id";
1181+
final String NAME = "name";
1182+
1183+
// generate schema\catalogs
1184+
List<AirbyteStream> configuredAirbyteStreams = new ArrayList<>();
1185+
for (int i = 0; i < streamsSize; i++) {
1186+
configuredAirbyteStreams
1187+
.add(CatalogHelpers.createAirbyteStream(USERS_STREAM_NAME + i,
1188+
Field.of(NAME, JsonSchemaPrimitive.STRING),
1189+
Field
1190+
.of(ID, JsonSchemaPrimitive.STRING)));
1191+
}
1192+
final AirbyteCatalog testCatalog = new AirbyteCatalog().withStreams(configuredAirbyteStreams);
1193+
final ConfiguredAirbyteCatalog configuredTestCatalog = CatalogHelpers
1194+
.toDefaultConfiguredCatalog(testCatalog);
1195+
1196+
final JsonNode config = getConfig();
1197+
final WorkerDestinationConfig destinationConfig = new WorkerDestinationConfig()
1198+
.withConnectionId(UUID.randomUUID())
1199+
.withCatalog(configuredTestCatalog)
1200+
.withDestinationConnectionConfiguration(config);
1201+
final AirbyteDestination destination = getDestination();
1202+
1203+
// Start destination
1204+
destination.start(destinationConfig, jobRoot);
1205+
1206+
AtomicInteger currentStreamNumber = new AtomicInteger(0);
1207+
AtomicInteger currentRecordNumberForStream = new AtomicInteger(0);
1208+
1209+
// this is just a current state logger. Useful when running long hours tests to see the progress
1210+
Thread countPrinter = new Thread(() -> {
1211+
while (true) {
1212+
System.out.println(
1213+
"currentStreamNumber=" + currentStreamNumber + ", currentRecordNumberForStream="
1214+
+ currentRecordNumberForStream + ", " + DateTime.now());
1215+
try {
1216+
Thread.sleep(10000);
1217+
} catch (InterruptedException e) {
1218+
e.printStackTrace();
1219+
}
1220+
}
1221+
1222+
});
1223+
countPrinter.start();
1224+
1225+
// iterate through streams
1226+
for (int streamCounter = 0; streamCounter < streamsSize; streamCounter++) {
1227+
LOGGER.info("Started new stream processing with #" + streamCounter);
1228+
// iterate through msm inside a particular stream
1229+
// Generate messages and put it to stream
1230+
for (int msgCounter = 0; msgCounter < messagesNumber; msgCounter++) {
1231+
AirbyteMessage msg = new AirbyteMessage()
1232+
.withType(AirbyteMessage.Type.RECORD)
1233+
.withRecord(new AirbyteRecordMessage().withStream(USERS_STREAM_NAME + streamCounter)
1234+
.withData(
1235+
Jsons.jsonNode(
1236+
ImmutableMap.builder().put(NAME, LOREM_IPSUM)
1237+
.put(ID, streamCounter + "_" + msgCounter)
1238+
.build()))
1239+
.withEmittedAt(Instant.now().toEpochMilli()));
1240+
try {
1241+
destination.accept(msg);
1242+
} catch (Exception e) {
1243+
LOGGER.error("Failed to write a RECORD message: " + e);
1244+
throw new RuntimeException(e);
1245+
}
1246+
1247+
currentRecordNumberForStream.set(msgCounter);
1248+
}
1249+
1250+
// send state message here, it's required
1251+
AirbyteMessage msgState = new AirbyteMessage()
1252+
.withType(AirbyteMessage.Type.STATE)
1253+
.withState(new AirbyteStateMessage()
1254+
.withData(
1255+
Jsons.jsonNode(ImmutableMap.builder().put("start_date", "2020-09-02").build())));
1256+
try {
1257+
destination.accept(msgState);
1258+
} catch (Exception e) {
1259+
LOGGER.error("Failed to write a STATE message: " + e);
1260+
throw new RuntimeException(e);
1261+
}
1262+
1263+
currentStreamNumber.set(streamCounter);
1264+
}
1265+
1266+
LOGGER.info(String
1267+
.format("Added %s messages to each of %s streams", currentRecordNumberForStream,
1268+
currentStreamNumber));
1269+
// Close destination
1270+
destination.notifyEndOfStream();
1271+
}
1272+
1273+
private final static String LOREM_IPSUM =
1274+
"Lorem ipsum dolor sit amet, consectetur adipiscing elit. Pellentesque malesuada lacinia aliquet. Nam feugiat mauris vel magna dignissim feugiat. Nam non dapibus sapien, ac mattis purus. Donec mollis libero erat, a rutrum ipsum pretium id. Pellentesque habitant morbi tristique senectus et netus et malesuada fames ac turpis egestas. Integer nec aliquam leo. Aliquam eu dictum augue, a ornare elit.\n"
1275+
+ "\n"
1276+
+ "Nulla viverra blandit neque. Nam blandit varius efficitur. Nunc at sapien blandit, malesuada lectus vel, tincidunt orci. Proin blandit metus eget libero facilisis interdum. Aenean luctus scelerisque orci, at scelerisque sem vestibulum in. Nullam ornare massa sed dui efficitur, eget volutpat lectus elementum. Orci varius natoque penatibus et magnis dis parturient montes, nascetur ridiculus mus. Integer elementum mi vitae erat eleifend iaculis. Nullam eget tincidunt est, eget tempor est. Sed risus velit, iaculis vitae est in, volutpat consectetur odio. Aenean ut fringilla elit. Suspendisse non aliquet massa. Curabitur suscipit metus nunc, nec porttitor velit venenatis vel. Fusce vestibulum eleifend diam, lobortis auctor magna.\n"
1277+
+ "\n"
1278+
+ "Etiam maximus, mi feugiat pharetra mattis, nulla neque euismod metus, in congue nunc sem nec ligula. Curabitur aliquam, risus id convallis cursus, nunc orci sollicitudin enim, quis scelerisque nibh dui in ipsum. Suspendisse mollis, metus a dapibus scelerisque, sapien nulla pretium ipsum, non finibus sem orci et lectus. Aliquam dictum magna nisi, a consectetur urna euismod nec. In pulvinar facilisis nulla, id mollis libero pulvinar vel. Nam a commodo leo, eu commodo dolor. In hac habitasse platea dictumst. Curabitur auctor purus quis tortor laoreet efficitur. Quisque tincidunt, risus vel rutrum fermentum, libero urna dignissim augue, eget pulvinar nibh ligula ut tortor. Vivamus convallis non risus sed consectetur. Etiam accumsan enim ac nisl suscipit, vel congue lorem volutpat. Lorem ipsum dolor sit amet, consectetur adipiscing elit. Fusce non orci quis lacus rhoncus vestibulum nec ut magna. In varius lectus nec quam posuere finibus. Vivamus quis lectus vitae tortor sollicitudin fermentum.\n"
1279+
+ "\n"
1280+
+ "Pellentesque elementum vehicula egestas. Sed volutpat velit arcu, at imperdiet sapien consectetur facilisis. Suspendisse porttitor tincidunt interdum. Morbi gravida faucibus tortor, ut rutrum magna tincidunt a. Morbi eu nisi eget dui finibus hendrerit sit amet in augue. Aenean imperdiet lacus enim, a volutpat nulla placerat at. Suspendisse nibh ipsum, venenatis vel maximus ut, fringilla nec felis. Sed risus mi, egestas quis quam ullamcorper, pharetra vestibulum diam.\n"
1281+
+ "\n"
1282+
+ "Praesent finibus scelerisque elit, accumsan condimentum risus mattis vitae. Donec tristique hendrerit facilisis. Curabitur metus purus, venenatis non elementum id, finibus eu augue. Quisque posuere rhoncus ligula, et vehicula erat pulvinar at. Pellentesque vel quam vel lectus tincidunt congue quis id sapien. Ut efficitur mauris vitae pretium iaculis. Aliquam consectetur iaculis nisi vitae laoreet. Integer vel odio quis diam mattis tempor eget nec est. Donec iaculis facilisis neque, at dictum magna vestibulum ut. Sed malesuada non nunc ac consequat. Maecenas tempus lectus a nisl congue, ac venenatis diam viverra. Nam ac justo id nulla iaculis lobortis in eu ligula. Vivamus et ligula id sapien efficitur aliquet. Curabitur est justo, tempus vitae mollis quis, tincidunt vitae felis. Vestibulum molestie laoreet justo, nec mollis purus vulputate at.";
1283+
11581284
}

airbyte-integrations/connectors/destination-gcs/Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,5 +7,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar
77

88
RUN tar xf ${APPLICATION}.tar --strip-components=1
99

10-
LABEL io.airbyte.version=0.1.0
10+
LABEL io.airbyte.version=0.1.2
1111
LABEL io.airbyte.name=airbyte/destination-gcs

airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/avro/GcsAvroWriter.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,8 @@ public GcsAvroWriter(GcsDestinationConfig config,
7373
objectKey);
7474

7575
this.avroRecordFactory = new AvroRecordFactory(schema, nameUpdater);
76-
this.uploadManager = S3StreamTransferManagerHelper.getDefault(config.getBucketName(), objectKey, s3Client);
76+
this.uploadManager = S3StreamTransferManagerHelper.getDefault(
77+
config.getBucketName(), objectKey, s3Client, config.getFormatConfig().getPartSize());
7778
// We only need one output stream as we only have one input stream. This is reasonably performant.
7879
this.outputStream = uploadManager.getMultiPartOutputStreams().get(0);
7980

airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/csv/GcsCsvWriter.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,8 @@ public GcsCsvWriter(GcsDestinationConfig config,
7474
LOGGER.info("Full GCS path for stream '{}': {}/{}", stream.getName(), config.getBucketName(),
7575
objectKey);
7676

77-
this.uploadManager = S3StreamTransferManagerHelper.getDefault(config.getBucketName(), objectKey, s3Client);
77+
this.uploadManager = S3StreamTransferManagerHelper.getDefault(
78+
config.getBucketName(), objectKey, s3Client, config.getFormatConfig().getPartSize());
7879
// We only need one output stream as we only have one input stream. This is reasonably performant.
7980
this.outputStream = uploadManager.getMultiPartOutputStreams().get(0);
8081
this.csvPrinter = new CSVPrinter(new PrintWriter(outputStream, true, StandardCharsets.UTF_8),

airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/jsonl/GcsJsonlWriter.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,9 @@ public GcsJsonlWriter(GcsDestinationConfig config,
6767

6868
LOGGER.info("Full GCS path for stream '{}': {}/{}", stream.getName(), config.getBucketName(), objectKey);
6969

70-
this.uploadManager = S3StreamTransferManagerHelper.getDefault(config.getBucketName(), objectKey, s3Client);
70+
this.uploadManager = S3StreamTransferManagerHelper.getDefault(
71+
config.getBucketName(), objectKey, s3Client, config.getFormatConfig().getPartSize());
72+
7173
// We only need one output stream as we only have one input stream. This is reasonably performant.
7274
this.outputStream = uploadManager.getMultiPartOutputStreams().get(0);
7375
this.printWriter = new PrintWriter(outputStream, true, StandardCharsets.UTF_8);

airbyte-integrations/connectors/destination-gcs/src/main/resources/spec.json

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -229,6 +229,13 @@
229229
}
230230
}
231231
]
232+
},
233+
"part_size_mb": {
234+
"title": "Block Size (MB) for GCS multipart upload",
235+
"description": "This is the size of a \"Part\" being buffered in memory. It limits the memory usage when writing. Larger values will allow to upload a bigger files and improve the speed, but consumes9 more memory. Allowed values: min=5MB, max=525MB Default: 5MB.",
236+
"type": "integer",
237+
"default": 5,
238+
"examples": [5]
232239
}
233240
}
234241
},
@@ -247,6 +254,13 @@
247254
"description": "Whether the input json data should be normalized (flattened) in the output CSV. Please refer to docs for details.",
248255
"default": "No flattening",
249256
"enum": ["No flattening", "Root level flattening"]
257+
},
258+
"part_size_mb": {
259+
"title": "Block Size (MB) for GCS multipart upload",
260+
"description": "This is the size of a \"Part\" being buffered in memory. It limits the memory usage when writing. Larger values will allow to upload a bigger files and improve the speed, but consumes9 more memory. Allowed values: min=5MB, max=525MB Default: 5MB.",
261+
"type": "integer",
262+
"default": 5,
263+
"examples": [5]
250264
}
251265
}
252266
},
@@ -258,6 +272,13 @@
258272
"type": "string",
259273
"enum": ["JSONL"],
260274
"default": "JSONL"
275+
},
276+
"part_size_mb": {
277+
"title": "Block Size (MB) for GCS multipart upload",
278+
"description": "This is the size of a \"Part\" being buffered in memory. It limits the memory usage when writing. Larger values will allow to upload a bigger files and improve the speed, but consumes9 more memory. Allowed values: min=5MB, max=525MB Default: 5MB.",
279+
"type": "integer",
280+
"default": 5,
281+
"examples": [5]
261282
}
262283
}
263284
},

0 commit comments

Comments
 (0)