Skip to content

Commit ed63cff

Browse files
reverting to cdk 0.28.0Revert "core java CDK core/testFixtures to kotlin (#36190)"
This reverts commit 88a58d9.
1 parent 15eeaba commit ed63cff

File tree

1,286 files changed

+66349
-80829
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

1,286 files changed

+66349
-80829
lines changed

.github/workflows/airbyte-ci-tests.yml

-1
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ jobs:
3030
with:
3131
# Note: expressions within a filter are OR'ed
3232
filters: |
33-
# This list is duplicated in `pipelines/airbyte_ci/test/__init__.py`
3433
internal_poetry_packages:
3534
- airbyte-lib/**
3635
- airbyte-ci/connectors/pipelines/**

.github/workflows/python_cdk_tests.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ jobs:
3737
needs:
3838
- changes
3939
if: needs.changes.outputs.python_cdk == 'true'
40-
runs-on: ubuntu-latest
40+
runs-on: connector-test-large
4141
name: Python CDK Tests
4242
timeout-minutes: 30
4343
steps:

airbyte-cdk/java/airbyte-cdk/azure-destinations/build.gradle

-6
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,3 @@
1-
compileKotlin {
2-
compilerOptions {
3-
allWarningsAsErrors = false
4-
}
5-
}
6-
71
dependencies {
82
implementation project(':airbyte-cdk:java:airbyte-cdk:dependencies')
93
implementation project(':airbyte-cdk:java:airbyte-cdk:core')
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
/*
2+
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.cdk.integrations.destination.jdbc.copy.azure;
6+
7+
import com.fasterxml.jackson.databind.JsonNode;
8+
import java.util.Locale;
9+
10+
public class AzureBlobStorageConfig {
11+
12+
private static final String DEFAULT_STORAGE_ENDPOINT_DOMAIN_NAME = "blob.core.windows.net";
13+
14+
private final String endpointDomainName;
15+
private final String accountName;
16+
private final String containerName;
17+
private final String sasToken;
18+
19+
public AzureBlobStorageConfig(
20+
String endpointDomainName,
21+
String accountName,
22+
String containerName,
23+
String sasToken) {
24+
this.endpointDomainName = endpointDomainName;
25+
this.accountName = accountName;
26+
this.containerName = containerName;
27+
this.sasToken = sasToken;
28+
}
29+
30+
public String getEndpointDomainName() {
31+
return endpointDomainName == null ? DEFAULT_STORAGE_ENDPOINT_DOMAIN_NAME : endpointDomainName;
32+
}
33+
34+
public String getAccountName() {
35+
return accountName;
36+
}
37+
38+
public String getContainerName() {
39+
return containerName;
40+
}
41+
42+
public String getSasToken() {
43+
return sasToken;
44+
}
45+
46+
public String getEndpointUrl() {
47+
return String.format(Locale.ROOT, "https://%s.%s", getAccountName(), getEndpointDomainName());
48+
}
49+
50+
public static AzureBlobStorageConfig getAzureBlobConfig(JsonNode config) {
51+
52+
return new AzureBlobStorageConfig(
53+
config.get("azure_blob_storage_endpoint_domain_name") == null ? DEFAULT_STORAGE_ENDPOINT_DOMAIN_NAME
54+
: config.get("azure_blob_storage_endpoint_domain_name").asText(),
55+
config.get("azure_blob_storage_account_name").asText(),
56+
config.get("azure_blob_storage_container_name").asText(),
57+
config.get("azure_blob_storage_sas_token").asText());
58+
59+
}
60+
61+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,276 @@
1+
/*
2+
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.cdk.integrations.destination.jdbc.copy.azure;
6+
7+
import com.azure.storage.blob.BlobContainerClient;
8+
import com.azure.storage.blob.specialized.AppendBlobClient;
9+
import com.azure.storage.blob.specialized.SpecializedBlobClientBuilder;
10+
import com.google.common.annotations.VisibleForTesting;
11+
import io.airbyte.cdk.db.jdbc.JdbcDatabase;
12+
import io.airbyte.cdk.integrations.destination.StandardNameTransformer;
13+
import io.airbyte.cdk.integrations.destination.jdbc.SqlOperations;
14+
import io.airbyte.cdk.integrations.destination.jdbc.StagingFilenameGenerator;
15+
import io.airbyte.cdk.integrations.destination.jdbc.constants.GlobalDataSizeConstants;
16+
import io.airbyte.cdk.integrations.destination.jdbc.copy.StreamCopier;
17+
import io.airbyte.commons.json.Jsons;
18+
import io.airbyte.protocol.models.v0.AirbyteRecordMessage;
19+
import io.airbyte.protocol.models.v0.DestinationSyncMode;
20+
import java.io.BufferedOutputStream;
21+
import java.io.ByteArrayInputStream;
22+
import java.io.IOException;
23+
import java.io.InputStream;
24+
import java.io.PrintWriter;
25+
import java.nio.charset.StandardCharsets;
26+
import java.sql.SQLException;
27+
import java.sql.Timestamp;
28+
import java.time.Instant;
29+
import java.util.HashMap;
30+
import java.util.HashSet;
31+
import java.util.Set;
32+
import java.util.UUID;
33+
import org.apache.commons.csv.CSVFormat;
34+
import org.apache.commons.csv.CSVPrinter;
35+
import org.slf4j.Logger;
36+
import org.slf4j.LoggerFactory;
37+
38+
public abstract class AzureBlobStorageStreamCopier implements StreamCopier {
39+
40+
private static final Logger LOGGER = LoggerFactory.getLogger(AzureBlobStorageStreamCopier.class);
41+
protected StagingFilenameGenerator filenameGenerator;
42+
protected final String stagingFolder;
43+
protected final Set<String> azureStagingFiles = new HashSet<>();
44+
protected final AzureBlobStorageConfig azureBlobConfig;
45+
protected final String tmpTableName;
46+
protected final String schemaName;
47+
protected final String streamName;
48+
protected final JdbcDatabase db;
49+
protected final Set<String> activeStagingWriterFileNames = new HashSet<>();
50+
private final StandardNameTransformer nameTransformer;
51+
private final SqlOperations sqlOperations;
52+
private final DestinationSyncMode destSyncMode;
53+
private final SpecializedBlobClientBuilder specializedBlobClientBuilder;
54+
private final HashMap<String, CSVPrinter> csvPrinters = new HashMap<>();
55+
private final HashMap<String, AppendBlobClient> blobClients = new HashMap<>();
56+
private String currentFile;
57+
58+
public AzureBlobStorageStreamCopier(final String stagingFolder,
59+
final DestinationSyncMode destSyncMode,
60+
final String schema,
61+
final String streamName,
62+
final SpecializedBlobClientBuilder specializedBlobClientBuilder,
63+
final JdbcDatabase db,
64+
final AzureBlobStorageConfig azureBlobConfig,
65+
final StandardNameTransformer nameTransformer,
66+
final SqlOperations sqlOperations) {
67+
this.stagingFolder = stagingFolder;
68+
this.destSyncMode = destSyncMode;
69+
this.schemaName = schema;
70+
this.streamName = streamName;
71+
this.db = db;
72+
this.nameTransformer = nameTransformer;
73+
this.sqlOperations = sqlOperations;
74+
this.tmpTableName = nameTransformer.getTmpTableName(streamName);
75+
this.specializedBlobClientBuilder = specializedBlobClientBuilder;
76+
this.azureBlobConfig = azureBlobConfig;
77+
this.filenameGenerator = new StagingFilenameGenerator(streamName, GlobalDataSizeConstants.DEFAULT_MAX_BATCH_SIZE_BYTES);
78+
}
79+
80+
public static void attemptAzureBlobWriteAndDelete(final AzureBlobStorageConfig config) {
81+
AppendBlobClient appendBlobClient = null;
82+
try {
83+
appendBlobClient = new SpecializedBlobClientBuilder()
84+
.endpoint(config.getEndpointUrl())
85+
.sasToken(config.getSasToken())
86+
.containerName(config.getContainerName())
87+
.blobName("testAzureBlob" + UUID.randomUUID())
88+
.buildAppendBlobClient();
89+
90+
final BlobContainerClient containerClient = getBlobContainerClient(appendBlobClient);
91+
writeTestDataIntoBlob(appendBlobClient);
92+
listCreatedBlob(containerClient);
93+
} finally {
94+
if (appendBlobClient != null && appendBlobClient.exists()) {
95+
LOGGER.info("Deleting blob: " + appendBlobClient.getBlobName());
96+
appendBlobClient.delete();
97+
}
98+
}
99+
100+
}
101+
102+
private static void listCreatedBlob(final BlobContainerClient containerClient) {
103+
containerClient.listBlobs().forEach(blobItem -> LOGGER.info("Blob name: " + blobItem.getName() + "Snapshot: " + blobItem.getSnapshot()));
104+
}
105+
106+
private static void writeTestDataIntoBlob(final AppendBlobClient appendBlobClient) {
107+
final String test = "test_data";
108+
LOGGER.info("Writing test data to Azure Blob storage: " + test);
109+
final InputStream dataStream = new ByteArrayInputStream(test.getBytes(StandardCharsets.UTF_8));
110+
111+
final Integer blobCommittedBlockCount = appendBlobClient.appendBlock(dataStream, test.length())
112+
.getBlobCommittedBlockCount();
113+
114+
LOGGER.info("blobCommittedBlockCount: " + blobCommittedBlockCount);
115+
}
116+
117+
private static BlobContainerClient getBlobContainerClient(final AppendBlobClient appendBlobClient) {
118+
final BlobContainerClient containerClient = appendBlobClient.getContainerClient();
119+
if (!containerClient.exists()) {
120+
containerClient.create();
121+
}
122+
123+
if (!appendBlobClient.exists()) {
124+
appendBlobClient.create();
125+
LOGGER.info("blobContainerClient created");
126+
} else {
127+
LOGGER.info("blobContainerClient already exists");
128+
}
129+
return containerClient;
130+
}
131+
132+
public Set<String> getAzureStagingFiles() {
133+
return azureStagingFiles;
134+
}
135+
136+
@Override
137+
public void write(final UUID id, final AirbyteRecordMessage recordMessage, final String azureFileName) throws Exception {
138+
if (csvPrinters.containsKey(azureFileName)) {
139+
csvPrinters.get(azureFileName).printRecord(id,
140+
Jsons.serialize(recordMessage.getData()),
141+
Timestamp.from(Instant.ofEpochMilli(recordMessage.getEmittedAt())));
142+
}
143+
}
144+
145+
@Override
146+
public String prepareStagingFile() {
147+
currentFile = prepareAzureStagingFile();
148+
if (!azureStagingFiles.contains(currentFile)) {
149+
150+
azureStagingFiles.add(currentFile);
151+
activeStagingWriterFileNames.add(currentFile);
152+
153+
final AppendBlobClient appendBlobClient = specializedBlobClientBuilder
154+
.blobName(currentFile)
155+
.buildAppendBlobClient();
156+
blobClients.put(currentFile, appendBlobClient);
157+
appendBlobClient.create(true);
158+
159+
final BufferedOutputStream bufferedOutputStream =
160+
new BufferedOutputStream(appendBlobClient.getBlobOutputStream(), Math.toIntExact(GlobalDataSizeConstants.MAX_FILE_SIZE));
161+
final var writer = new PrintWriter(bufferedOutputStream, true, StandardCharsets.UTF_8);
162+
try {
163+
csvPrinters.put(currentFile, new CSVPrinter(writer, CSVFormat.DEFAULT));
164+
} catch (final IOException e) {
165+
throw new RuntimeException(e);
166+
}
167+
}
168+
return currentFile;
169+
}
170+
171+
private String prepareAzureStagingFile() {
172+
return String.join("/", stagingFolder, schemaName, filenameGenerator.getStagingFilename());
173+
}
174+
175+
@Override
176+
public void closeStagingUploader(final boolean hasFailed) throws Exception {
177+
LOGGER.info("Uploading remaining data for {} stream.", streamName);
178+
for (final var csvPrinter : csvPrinters.values()) {
179+
csvPrinter.close();
180+
}
181+
LOGGER.info("All data for {} stream uploaded.", streamName);
182+
}
183+
184+
@Override
185+
public void createDestinationSchema() throws Exception {
186+
LOGGER.info("Creating schema in destination if it doesn't exist: {}", schemaName);
187+
sqlOperations.createSchemaIfNotExists(db, schemaName);
188+
}
189+
190+
@Override
191+
public void createTemporaryTable() throws Exception {
192+
LOGGER.info("Preparing tmp table in destination for stream: {}, schema: {}, tmp table name: {}.", streamName, schemaName, tmpTableName);
193+
sqlOperations.createTableIfNotExists(db, schemaName, tmpTableName);
194+
}
195+
196+
@Override
197+
public void copyStagingFileToTemporaryTable() throws Exception {
198+
LOGGER.info("Starting copy to tmp table: {} in destination for stream: {}, schema: {}.", tmpTableName, streamName, schemaName);
199+
for (final var azureStagingFile : azureStagingFiles) {
200+
copyAzureBlobCsvFileIntoTable(db, getFullAzurePath(azureStagingFile), schemaName, tmpTableName, azureBlobConfig);
201+
}
202+
LOGGER.info("Copy to tmp table {} in destination for stream {} complete.", tmpTableName, streamName);
203+
}
204+
205+
private String getFullAzurePath(final String azureStagingFile) {
206+
return "azure://" + azureBlobConfig.getAccountName() + "." + azureBlobConfig.getEndpointDomainName()
207+
+ "/" + azureBlobConfig.getContainerName() + "/" + azureStagingFile;
208+
}
209+
210+
@Override
211+
public String createDestinationTable() throws Exception {
212+
final var destTableName = nameTransformer.getRawTableName(streamName);
213+
LOGGER.info("Preparing table {} in destination.", destTableName);
214+
sqlOperations.createTableIfNotExists(db, schemaName, destTableName);
215+
LOGGER.info("Table {} in destination prepared.", tmpTableName);
216+
217+
return destTableName;
218+
}
219+
220+
@Override
221+
public String generateMergeStatement(final String destTableName) throws Exception {
222+
LOGGER.info("Preparing to merge tmp table {} to dest table: {}, schema: {}, in destination.", tmpTableName, destTableName, schemaName);
223+
final var queries = new StringBuilder();
224+
if (destSyncMode.equals(DestinationSyncMode.OVERWRITE)) {
225+
queries.append(sqlOperations.truncateTableQuery(db, schemaName, destTableName));
226+
LOGGER.info("Destination OVERWRITE mode detected. Dest table: {}, schema: {}, truncated.", destTableName, schemaName);
227+
}
228+
queries.append(sqlOperations.insertTableQuery(db, schemaName, tmpTableName, destTableName));
229+
return queries.toString();
230+
}
231+
232+
@Override
233+
public void removeFileAndDropTmpTable() throws Exception {
234+
LOGGER.info("Begin cleaning azure blob staging files.");
235+
for (final AppendBlobClient appendBlobClient : blobClients.values()) {
236+
appendBlobClient.delete();
237+
}
238+
LOGGER.info("Azure Blob staging files cleaned.");
239+
240+
LOGGER.info("Begin cleaning {} tmp table in destination.", tmpTableName);
241+
sqlOperations.dropTableIfExists(db, schemaName, tmpTableName);
242+
LOGGER.info("{} tmp table in destination cleaned.", tmpTableName);
243+
}
244+
245+
@Override
246+
public void closeNonCurrentStagingFileWriters() throws Exception {
247+
LOGGER.info("Begin closing non current file writers");
248+
final Set<String> removedKeys = new HashSet<>();
249+
for (final String key : activeStagingWriterFileNames) {
250+
if (!key.equals(currentFile)) {
251+
csvPrinters.get(key).close();
252+
csvPrinters.remove(key);
253+
removedKeys.add(key);
254+
}
255+
}
256+
activeStagingWriterFileNames.removeAll(removedKeys);
257+
}
258+
259+
@Override
260+
public String getCurrentFile() {
261+
return currentFile;
262+
}
263+
264+
@VisibleForTesting
265+
public String getTmpTableName() {
266+
return tmpTableName;
267+
}
268+
269+
public abstract void copyAzureBlobCsvFileIntoTable(JdbcDatabase database,
270+
String snowflakeAzureExternalStageName,
271+
String schema,
272+
String tableName,
273+
AzureBlobStorageConfig config)
274+
throws SQLException;
275+
276+
}

0 commit comments

Comments
 (0)