4
4
5
5
package io .airbyte .integrations .destination .databricks .s3 ;
6
6
7
- import static org .junit .jupiter .api .Assertions .assertEquals ;
8
-
7
+ import com .fasterxml .jackson .databind .ObjectMapper ;
8
+ import com .fasterxml .jackson .databind .node .ObjectNode ;
9
+ import io .airbyte .cdk .db .jdbc .JdbcDatabase ;
10
+ import io .airbyte .cdk .integrations .destination .StandardNameTransformer ;
11
+ import io .airbyte .cdk .integrations .destination .jdbc .SqlOperations ;
12
+ import io .airbyte .cdk .integrations .destination .jdbc .copy .StreamCopier ;
13
+ import io .airbyte .cdk .integrations .destination .jdbc .copy .StreamCopierFactory ;
9
14
import io .airbyte .cdk .integrations .destination .s3 .S3DestinationConfig ;
10
- import java .util .UUID ;
15
+ import io .airbyte .cdk .integrations .destination .s3 .parquet .S3ParquetWriter ;
16
+ import io .airbyte .cdk .integrations .destination .s3 .writer .ProductionWriterFactory ;
17
+ import io .airbyte .integrations .destination .databricks .DatabricksDestinationConfig ;
18
+ import io .airbyte .integrations .destination .databricks .DatabricksNameTransformer ;
19
+ import io .airbyte .protocol .models .v0 .AirbyteStream ;
20
+ import io .airbyte .protocol .models .v0 .ConfiguredAirbyteStream ;
21
+ import io .airbyte .protocol .models .v0 .SyncMode ;
11
22
import org .junit .jupiter .api .Test ;
12
23
24
+ import java .util .UUID ;
25
+
26
+ import static io .airbyte .protocol .models .v0 .CatalogHelpers .createAirbyteStream ;
27
+ import static org .junit .jupiter .api .Assertions .assertEquals ;
28
+ import static org .mockito .ArgumentMatchers .any ;
29
+ import static org .mockito .Mockito .mock ;
30
+ import static org .mockito .Mockito .when ;
31
+
13
32
class DatabricksS3StreamCopierTest {
14
33
34
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper ();
35
+ final String bucketName = UUID .randomUUID ().toString ();
36
+ final String bucketPath = UUID .randomUUID ().toString ();
37
+ final String bucketRegion = UUID .randomUUID ().toString ();
38
+ final String stagingFolder = UUID .randomUUID ().toString ();
39
+
15
40
@ Test
16
41
public void testGetStagingS3DestinationConfig () {
17
- final String bucketPath = UUID .randomUUID ().toString ();
18
42
final S3DestinationConfig config = S3DestinationConfig .create ("" , bucketPath , "" ).get ();
19
- final String stagingFolder = UUID .randomUUID ().toString ();
20
43
final S3DestinationConfig stagingConfig = DatabricksS3StreamCopier .getStagingS3DestinationConfig (config , stagingFolder );
21
44
assertEquals (String .format ("%s/%s" , bucketPath , stagingFolder ), stagingConfig .getBucketPath ());
22
45
}
23
46
24
- }
47
+ @ Test
48
+ public void testGetDestinationTablePath () {
49
+ final String namespace = UUID .randomUUID ().toString ();
50
+ final String tableName = UUID .randomUUID ().toString ();
51
+
52
+ ConfiguredAirbyteStream configuredAirbyteStream = new ConfiguredAirbyteStream ()
53
+ .withStream (createAirbyteStream (tableName , namespace ))
54
+ .withSyncMode (SyncMode .FULL_REFRESH );
55
+
56
+ final ObjectNode dataS3Config = OBJECT_MAPPER .createObjectNode ()
57
+ .put ("data_source_type" , "S3_STORAGE" )
58
+ .put ("s3_bucket_name" , bucketName )
59
+ .put ("s3_bucket_path" , bucketPath )
60
+ .put ("s3_bucket_region" , bucketRegion )
61
+ .put ("s3_access_key_id" , "access_key_id" )
62
+ .put ("s3_secret_access_key" , "secret_access_key" );
63
+
64
+ final ObjectNode config = OBJECT_MAPPER .createObjectNode ()
65
+ .put ("accept_terms" , true )
66
+ .put ("databricks_server_hostname" , "server_hostname" )
67
+ .put ("databricks_http_path" , "http_path" )
68
+ .put ("databricks_personal_access_token" , "pak" )
69
+ .set ("data_source" , dataS3Config );
70
+
71
+ DatabricksDestinationConfig databricksConfig = DatabricksDestinationConfig .get (config );
72
+ DatabricksS3StreamCopierFactory factory = new DatabricksS3StreamCopierFactory () {
73
+ @ Override
74
+ public StreamCopier create (String configuredSchema , DatabricksDestinationConfig databricksConfig , String stagingFolder , ConfiguredAirbyteStream configuredStream , StandardNameTransformer nameTransformer , JdbcDatabase database , SqlOperations sqlOperations ) {
75
+ try {
76
+ final AirbyteStream stream = configuredStream .getStream ();
77
+ final String catalogName = databricksConfig .catalog ();
78
+ final String schema = StreamCopierFactory .getSchema (stream .getNamespace (), configuredSchema , nameTransformer );
79
+
80
+ S3ParquetWriter writer = mock (S3ParquetWriter .class );
81
+ final ProductionWriterFactory writerFactory = mock (ProductionWriterFactory .class );
82
+ when (writerFactory .create (any (), any (), any (), any ())).thenReturn (writer );
83
+
84
+ return new DatabricksS3StreamCopier (stagingFolder , catalogName , schema , configuredStream , null , null ,
85
+ databricksConfig , nameTransformer , null , writerFactory , null );
86
+ } catch (final Exception e ) {
87
+ throw new RuntimeException (e );
88
+ }
89
+ }
90
+ };
91
+
92
+ final StandardNameTransformer nameTransformer = new DatabricksNameTransformer ();
93
+ DatabricksS3StreamCopier streamCopier = (DatabricksS3StreamCopier ) factory .create (databricksConfig .schema (), databricksConfig , stagingFolder , configuredAirbyteStream , nameTransformer , null , null );
94
+ assertEquals (String .format ("s3://%s/%s/%s/%s" , bucketName , bucketPath , nameTransformer .getNamespace (namespace ), tableName ), streamCopier .getDestTableLocation ());
95
+ }
96
+
97
+ }
0 commit comments