@@ -17,65 +17,70 @@ import io.airbyte.cdk.integrations.destination.jdbc.copy.StreamCopier
17
17
import io.airbyte.commons.json.Jsons
18
18
import io.airbyte.protocol.models.v0.AirbyteRecordMessage
19
19
import io.airbyte.protocol.models.v0.DestinationSyncMode
20
- import org.apache.commons.csv.CSVFormat
21
- import org.apache.commons.csv.CSVPrinter
22
- import org.slf4j.Logger
23
- import org.slf4j.LoggerFactory
24
20
import java.io.*
25
21
import java.nio.charset.StandardCharsets
26
22
import java.sql.SQLException
27
23
import java.sql.Timestamp
28
24
import java.time.Instant
29
25
import java.util.*
30
26
import java.util.function.Consumer
27
+ import org.apache.commons.csv.CSVFormat
28
+ import org.apache.commons.csv.CSVPrinter
29
+ import org.slf4j.Logger
30
+ import org.slf4j.LoggerFactory
31
+
32
+ abstract class AzureBlobStorageStreamCopier (
33
+ protected val stagingFolder : String ,
34
+ private val destSyncMode : DestinationSyncMode ,
35
+ protected val schemaName : String ,
36
+ protected val streamName : String ,
37
+ private val specializedBlobClientBuilder : SpecializedBlobClientBuilder ,
38
+ protected val db : JdbcDatabase ,
39
+ protected val azureBlobConfig : AzureBlobStorageConfig ,
40
+ private val nameTransformer : StandardNameTransformer ,
41
+ private val sqlOperations : SqlOperations
42
+ ) : StreamCopier {
43
+ protected var filenameGenerator: StagingFilenameGenerator =
44
+ StagingFilenameGenerator (
45
+ streamName,
46
+ GlobalDataSizeConstants .DEFAULT_MAX_BATCH_SIZE_BYTES .toLong()
47
+ )
48
+ protected val azureStagingFiles: MutableSet <String > = HashSet ()
31
49
32
- abstract class AzureBlobStorageStreamCopier (protected val stagingFolder : String ,
33
- private val destSyncMode : DestinationSyncMode ,
34
- protected val schemaName : String ,
35
- protected val streamName : String ,
36
- private val specializedBlobClientBuilder : SpecializedBlobClientBuilder ,
37
- protected val db : JdbcDatabase ,
38
- protected val azureBlobConfig : AzureBlobStorageConfig ,
39
- private val nameTransformer : StandardNameTransformer ,
40
- private val sqlOperations : SqlOperations ) : StreamCopier {
41
- protected var filenameGenerator: StagingFilenameGenerator = StagingFilenameGenerator (streamName, GlobalDataSizeConstants .DEFAULT_MAX_BATCH_SIZE_BYTES .toLong())
42
- protected val azureStagingFiles: MutableSet <String ?> = HashSet ()
43
-
44
- @get:VisibleForTesting
45
- val tmpTableName: String = nameTransformer.getTmpTableName(streamName)
50
+ @get:VisibleForTesting val tmpTableName: String = nameTransformer.getTmpTableName(streamName)
46
51
protected val activeStagingWriterFileNames: MutableSet <String ?> = HashSet ()
47
52
private val csvPrinters = HashMap <String ?, CSVPrinter >()
48
53
private val blobClients = HashMap <String ?, AppendBlobClient >()
49
54
override var currentFile: String? = null
50
- private set
51
-
52
- fun getAzureStagingFiles (): Set <String ?> {
53
- return azureStagingFiles
54
- }
55
55
56
56
@Throws(Exception ::class )
57
57
override fun write (id : UUID ? , recordMessage : AirbyteRecordMessage ? , azureFileName : String? ) {
58
58
if (csvPrinters.containsKey(azureFileName)) {
59
- csvPrinters[azureFileName]!! .printRecord(id,
60
- Jsons .serialize(recordMessage!! .data),
61
- Timestamp .from(Instant .ofEpochMilli(recordMessage.emittedAt)))
59
+ csvPrinters[azureFileName]!! .printRecord(
60
+ id,
61
+ Jsons .serialize(recordMessage!! .data),
62
+ Timestamp .from(Instant .ofEpochMilli(recordMessage.emittedAt))
63
+ )
62
64
}
63
65
}
64
66
65
67
override fun prepareStagingFile (): String? {
66
68
currentFile = prepareAzureStagingFile()
69
+ val currentFile = this .currentFile!!
67
70
if (! azureStagingFiles.contains(currentFile)) {
68
71
azureStagingFiles.add(currentFile)
69
72
activeStagingWriterFileNames.add(currentFile)
70
73
71
- val appendBlobClient = specializedBlobClientBuilder
72
- .blobName(currentFile)
73
- .buildAppendBlobClient()
74
+ val appendBlobClient =
75
+ specializedBlobClientBuilder.blobName(currentFile).buildAppendBlobClient()
74
76
blobClients[currentFile] = appendBlobClient
75
77
appendBlobClient.create(true )
76
78
77
79
val bufferedOutputStream =
78
- BufferedOutputStream (appendBlobClient.blobOutputStream, Math .toIntExact(GlobalDataSizeConstants .MAX_FILE_SIZE ))
80
+ BufferedOutputStream (
81
+ appendBlobClient.blobOutputStream,
82
+ Math .toIntExact(GlobalDataSizeConstants .MAX_FILE_SIZE )
83
+ )
79
84
val writer = PrintWriter (bufferedOutputStream, true , StandardCharsets .UTF_8 )
80
85
try {
81
86
csvPrinters[currentFile] = CSVPrinter (writer, CSVFormat .DEFAULT )
@@ -87,7 +92,12 @@ abstract class AzureBlobStorageStreamCopier(protected val stagingFolder: String,
87
92
}
88
93
89
94
private fun prepareAzureStagingFile (): String {
90
- return java.lang.String .join(" /" , stagingFolder, schemaName, filenameGenerator.stagingFilename)
95
+ return java.lang.String .join(
96
+ " /" ,
97
+ stagingFolder,
98
+ schemaName,
99
+ filenameGenerator.stagingFilename
100
+ )
91
101
}
92
102
93
103
@Throws(Exception ::class )
@@ -107,22 +117,48 @@ abstract class AzureBlobStorageStreamCopier(protected val stagingFolder: String,
107
117
108
118
@Throws(Exception ::class )
109
119
override fun createTemporaryTable () {
110
- LOGGER .info(" Preparing tmp table in destination for stream: {}, schema: {}, tmp table name: {}." , streamName, schemaName, tmpTableName)
120
+ LOGGER .info(
121
+ " Preparing tmp table in destination for stream: {}, schema: {}, tmp table name: {}." ,
122
+ streamName,
123
+ schemaName,
124
+ tmpTableName
125
+ )
111
126
sqlOperations.createTableIfNotExists(db, schemaName, tmpTableName)
112
127
}
113
128
114
129
@Throws(Exception ::class )
115
130
override fun copyStagingFileToTemporaryTable () {
116
- LOGGER .info(" Starting copy to tmp table: {} in destination for stream: {}, schema: {}." , tmpTableName, streamName, schemaName)
131
+ LOGGER .info(
132
+ " Starting copy to tmp table: {} in destination for stream: {}, schema: {}." ,
133
+ tmpTableName,
134
+ streamName,
135
+ schemaName
136
+ )
117
137
for (azureStagingFile in azureStagingFiles) {
118
- copyAzureBlobCsvFileIntoTable(db, getFullAzurePath(azureStagingFile), schemaName, tmpTableName, azureBlobConfig)
138
+ copyAzureBlobCsvFileIntoTable(
139
+ db,
140
+ getFullAzurePath(azureStagingFile),
141
+ schemaName,
142
+ tmpTableName,
143
+ azureBlobConfig
144
+ )
119
145
}
120
- LOGGER .info(" Copy to tmp table {} in destination for stream {} complete." , tmpTableName, streamName)
146
+ LOGGER .info(
147
+ " Copy to tmp table {} in destination for stream {} complete." ,
148
+ tmpTableName,
149
+ streamName
150
+ )
121
151
}
122
152
123
153
private fun getFullAzurePath (azureStagingFile : String? ): String {
124
- return (" azure://" + azureBlobConfig.accountName + " ." + azureBlobConfig.endpointDomainName
125
- + " /" + azureBlobConfig.containerName + " /" + azureStagingFile)
154
+ return (" azure://" +
155
+ azureBlobConfig.accountName +
156
+ " ." +
157
+ azureBlobConfig.endpointDomainName +
158
+ " /" +
159
+ azureBlobConfig.containerName +
160
+ " /" +
161
+ azureStagingFile)
126
162
}
127
163
128
164
@Throws(Exception ::class )
@@ -137,11 +173,20 @@ abstract class AzureBlobStorageStreamCopier(protected val stagingFolder: String,
137
173
138
174
@Throws(Exception ::class )
139
175
override fun generateMergeStatement (destTableName : String? ): String? {
140
- LOGGER .info(" Preparing to merge tmp table {} to dest table: {}, schema: {}, in destination." , tmpTableName, destTableName, schemaName)
176
+ LOGGER .info(
177
+ " Preparing to merge tmp table {} to dest table: {}, schema: {}, in destination." ,
178
+ tmpTableName,
179
+ destTableName,
180
+ schemaName
181
+ )
141
182
val queries = StringBuilder ()
142
183
if (destSyncMode == DestinationSyncMode .OVERWRITE ) {
143
184
queries.append(sqlOperations.truncateTableQuery(db, schemaName, destTableName))
144
- LOGGER .info(" Destination OVERWRITE mode detected. Dest table: {}, schema: {}, truncated." , destTableName, schemaName)
185
+ LOGGER .info(
186
+ " Destination OVERWRITE mode detected. Dest table: {}, schema: {}, truncated." ,
187
+ destTableName,
188
+ schemaName
189
+ )
145
190
}
146
191
queries.append(sqlOperations.insertTableQuery(db, schemaName, tmpTableName, destTableName))
147
192
return queries.toString()
@@ -175,18 +220,22 @@ abstract class AzureBlobStorageStreamCopier(protected val stagingFolder: String,
175
220
}
176
221
177
222
@Throws(SQLException ::class )
178
- abstract fun copyAzureBlobCsvFileIntoTable (database : JdbcDatabase ? ,
179
- snowflakeAzureExternalStageName : String? ,
180
- schema : String? ,
181
- tableName : String? ,
182
- config : AzureBlobStorageConfig ? )
223
+ abstract fun copyAzureBlobCsvFileIntoTable (
224
+ database : JdbcDatabase ? ,
225
+ snowflakeAzureExternalStageName : String? ,
226
+ schema : String? ,
227
+ tableName : String? ,
228
+ config : AzureBlobStorageConfig ?
229
+ )
183
230
184
231
companion object {
185
- private val LOGGER : Logger = LoggerFactory .getLogger(AzureBlobStorageStreamCopier ::class .java)
232
+ private val LOGGER : Logger =
233
+ LoggerFactory .getLogger(AzureBlobStorageStreamCopier ::class .java)
186
234
fun attemptAzureBlobWriteAndDelete (config : AzureBlobStorageConfig ) {
187
235
var appendBlobClient: AppendBlobClient ? = null
188
236
try {
189
- appendBlobClient = SpecializedBlobClientBuilder ()
237
+ appendBlobClient =
238
+ SpecializedBlobClientBuilder ()
190
239
.endpoint(config.endpointUrl)
191
240
.sasToken(config.sasToken)
192
241
.containerName(config.containerName)
@@ -205,21 +254,34 @@ abstract class AzureBlobStorageStreamCopier(protected val stagingFolder: String,
205
254
}
206
255
207
256
private fun listCreatedBlob (containerClient : BlobContainerClient ) {
208
- containerClient.listBlobs().forEach(Consumer { blobItem: BlobItem -> LOGGER .info(" Blob name: " + blobItem.name + " Snapshot: " + blobItem.snapshot) })
257
+ containerClient
258
+ .listBlobs()
259
+ .forEach(
260
+ Consumer { blobItem: BlobItem ->
261
+ LOGGER .info(
262
+ " Blob name: " + blobItem.name + " Snapshot: " + blobItem.snapshot
263
+ )
264
+ }
265
+ )
209
266
}
210
267
211
268
private fun writeTestDataIntoBlob (appendBlobClient : AppendBlobClient ? ) {
212
269
val test = " test_data"
213
270
LOGGER .info(" Writing test data to Azure Blob storage: $test " )
214
- val dataStream: InputStream = ByteArrayInputStream (test.toByteArray(StandardCharsets .UTF_8 ))
271
+ val dataStream: InputStream =
272
+ ByteArrayInputStream (test.toByteArray(StandardCharsets .UTF_8 ))
215
273
216
- val blobCommittedBlockCount = appendBlobClient!! .appendBlock(dataStream, test.length.toLong())
274
+ val blobCommittedBlockCount =
275
+ appendBlobClient!!
276
+ .appendBlock(dataStream, test.length.toLong())
217
277
.blobCommittedBlockCount
218
278
219
279
LOGGER .info(" blobCommittedBlockCount: $blobCommittedBlockCount " )
220
280
}
221
281
222
- private fun getBlobContainerClient (appendBlobClient : AppendBlobClient ? ): BlobContainerClient {
282
+ private fun getBlobContainerClient (
283
+ appendBlobClient : AppendBlobClient ?
284
+ ): BlobContainerClient {
223
285
val containerClient = appendBlobClient!! .containerClient
224
286
if (! containerClient.exists()) {
225
287
containerClient.create()
0 commit comments