Skip to content

Commit 5779333

Browse files
authored
Update spec and configs (#5792)
* changes toward creating database/tables through jdbc * Delete DatabricksSqlOperations.java * revert sqlops * minor changes
1 parent d7db844 commit 5779333

File tree

5 files changed

+133
-23
lines changed

5 files changed

+133
-23
lines changed

airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksDestination.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import io.airbyte.db.Databases;
2929
import io.airbyte.db.jdbc.JdbcDatabase;
3030
import io.airbyte.integrations.base.AirbyteMessageConsumer;
31+
import io.airbyte.integrations.base.IntegrationRunner;
3132
import io.airbyte.integrations.destination.ExtendedNameTransformer;
3233
import io.airbyte.integrations.destination.jdbc.SqlOperations;
3334
import io.airbyte.integrations.destination.jdbc.copy.CopyConsumerFactory;
@@ -42,6 +43,10 @@ public class DatabricksDestination extends CopyDestination {
4243

4344
private static final String DRIVER_CLASS = "com.simba.spark.jdbc.Driver";
4445

46+
public static void main(String[] args) throws Exception {
47+
new IntegrationRunner(new DatabricksDestination()).run(args);
48+
}
49+
4550
@Override
4651
public AirbyteMessageConsumer getConsumer(JsonNode config, ConfiguredAirbyteCatalog catalog, Consumer<AirbyteMessage> outputRecordCollector) {
4752
return CopyConsumerFactory.create(
@@ -52,7 +57,7 @@ public AirbyteMessageConsumer getConsumer(JsonNode config, ConfiguredAirbyteCata
5257
S3Config.getS3Config(config),
5358
catalog,
5459
new DatabricksStreamCopierFactory(),
55-
config.get("schema").asText()
60+
config.get("schema").asText().equals("") ? "default" : config.get("schema").asText()
5661
);
5762
}
5863

@@ -69,9 +74,11 @@ public ExtendedNameTransformer getNameTransformer() {
6974
@Override
7075
public JdbcDatabase getDatabase(JsonNode databricksConfig) {
7176
return Databases.createJdbcDatabase(
72-
databricksConfig.get("username").asText(),
73-
databricksConfig.has("password") ? databricksConfig.get("password").asText() : null,
74-
databricksConfig.get("jdbc_url").asText(),
77+
"token",
78+
databricksConfig.get("pat").asText(),
79+
String.format("jdbc:spark://%s:443/default;transportMode=http;ssl=1;httpPath=%s",
80+
databricksConfig.get("serverHostname").asText(),
81+
databricksConfig.get("httpPath").asText()),
7582
DRIVER_CLASS
7683
);
7784
}

airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksStreamCopier.java

Lines changed: 40 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import io.airbyte.protocol.models.AirbyteRecordMessage;
1515
import io.airbyte.protocol.models.AirbyteStream;
1616
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
17+
import io.airbyte.protocol.models.DestinationSyncMode;
1718
import java.sql.Timestamp;
1819
import java.util.UUID;
1920
import org.slf4j.Logger;
@@ -31,15 +32,20 @@ public class DatabricksStreamCopier implements StreamCopier {
3132
private final AmazonS3 s3Client;
3233
private final S3Config s3Config;
3334
private final String tmpTableName;
35+
private final DestinationSyncMode syncMode;
3436
private final AirbyteStream stream;
3537
private final JdbcDatabase db;
38+
private final String database;
39+
private final String streamName;
3640
private final ExtendedNameTransformer nameTransformer;
37-
private final SqlOperations sqlOperations;
41+
private final DatabricksSqlOperations sqlOperations;
3842
private final S3ParquetWriter parquetWriter;
3943

4044
public DatabricksStreamCopier(String stagingFolder,
45+
DestinationSyncMode syncMode,
4146
String schema,
4247
ConfiguredAirbyteStream configuredStream,
48+
String streamName,
4349
AmazonS3 s3Client,
4450
JdbcDatabase db,
4551
S3Config s3Config,
@@ -48,14 +54,18 @@ public DatabricksStreamCopier(String stagingFolder,
4854
S3WriterFactory writerFactory,
4955
Timestamp uploadTime) throws Exception {
5056
this.stream = configuredStream.getStream();
57+
this.syncMode = syncMode;
5158
this.db = db;
59+
this.database = schema;
60+
this.streamName = streamName;
5261
this.nameTransformer = nameTransformer;
53-
this.sqlOperations = sqlOperations;
54-
this.tmpTableName = nameTransformer.getTmpTableName(stream.getName());
62+
this.sqlOperations = (DatabricksSqlOperations) sqlOperations;
63+
this.tmpTableName = nameTransformer.getTmpTableName(streamName);
5564
this.s3Client = s3Client;
5665
this.s3Config = s3Config;
5766
this.parquetWriter = (S3ParquetWriter) writerFactory
5867
.create(getS3DestinationConfig(s3Config, stagingFolder), s3Client, configuredStream, uploadTime);
68+
LOGGER.info(parquetWriter.parquetSchema.toString());
5969
}
6070

6171
@Override
@@ -69,28 +79,46 @@ public void closeStagingUploader(boolean hasFailed) throws Exception {
6979
}
7080

7181
@Override
72-
public void createTemporaryTable() throws Exception {
73-
82+
public void createDestinationSchema() throws Exception {
83+
LOGGER.info("Creating database in destination if it doesn't exist: {}", database);
84+
sqlOperations.createSchemaIfNotExists(db, database);
7485
}
7586

7687
@Override
77-
public void copyStagingFileToTemporaryTable() throws Exception {
78-
88+
public void createTemporaryTable() throws Exception {
89+
LOGGER.info("Preparing tmp table in destination for stream: {}, database: {}, tmp table name: {}.", streamName, database, tmpTableName);
90+
LOGGER.info(parquetWriter.parquetSchema.toString());
91+
sqlOperations.createTableIfNotExists(db, database, tmpTableName);
7992
}
8093

8194
@Override
82-
public void createDestinationSchema() throws Exception {
83-
95+
public void copyStagingFileToTemporaryTable() throws Exception {
96+
LOGGER.info("Starting copy to tmp table: {} in destination for stream: {}, database: {}, .", tmpTableName, streamName, database);
97+
// TODO: load data sql operation
98+
LOGGER.info("Copy to tmp table {} in destination for stream {} complete.", tmpTableName, streamName);
8499
}
85100

101+
86102
@Override
87103
public String createDestinationTable() throws Exception {
88-
return null;
104+
var destTableName = nameTransformer.getRawTableName(streamName);
105+
LOGGER.info("Preparing table {} in destination.", destTableName);
106+
sqlOperations.createTableIfNotExists(db, database, destTableName);
107+
LOGGER.info("Table {} in destination prepared.", tmpTableName);
108+
109+
return destTableName;
89110
}
90111

91112
@Override
92-
public String generateMergeStatement(String destTableName) throws Exception {
93-
return null;
113+
public String generateMergeStatement(String destTableName) {
114+
LOGGER.info("Preparing to merge tmp table {} to dest table: {}, database: {}, in destination.", tmpTableName, destTableName, database);
115+
var queries = new StringBuilder();
116+
if (syncMode.equals(DestinationSyncMode.OVERWRITE)) {
117+
queries.append(sqlOperations.truncateTableQuery(db, database, destTableName));
118+
LOGGER.info("Destination OVERWRITE mode detected. Dest table: {}, database: {}, truncated.", destTableName, database);
119+
}
120+
queries.append(sqlOperations.copyTableQuery(db, database, tmpTableName, destTableName));
121+
return queries.toString();
94122
}
95123

96124
@Override

airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksStreamCopierFactory.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import io.airbyte.integrations.destination.s3.writer.S3WriterFactory;
1414
import io.airbyte.protocol.models.AirbyteStream;
1515
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
16+
import io.airbyte.protocol.models.DestinationSyncMode;
1617
import java.sql.Timestamp;
1718

1819
public class DatabricksStreamCopierFactory implements StreamCopierFactory<S3Config> {
@@ -27,13 +28,14 @@ public StreamCopier create(String configuredSchema,
2728
SqlOperations sqlOperations) {
2829
try {
2930
AirbyteStream stream = configuredStream.getStream();
31+
DestinationSyncMode syncMode = configuredStream.getDestinationSyncMode();
3032
String schema = StreamCopierFactory.getSchema(stream, configuredSchema, nameTransformer);
3133
AmazonS3 s3Client = S3StreamCopier.getAmazonS3(s3Config);
3234
S3WriterFactory writerFactory = new ProductionWriterFactory();
3335
Timestamp uploadTimestamp = new Timestamp(System.currentTimeMillis());
3436

3537
return new DatabricksStreamCopier(
36-
stagingFolder, schema, configuredStream, s3Client, db, s3Config, nameTransformer, sqlOperations, writerFactory, uploadTimestamp);
38+
stagingFolder, syncMode, schema, configuredStream, stream.getName(), s3Client, db, s3Config, nameTransformer, sqlOperations, writerFactory, uploadTimestamp);
3739
} catch (Exception e) {
3840
throw new RuntimeException(e);
3941
}

airbyte-integrations/connectors/destination-databricks/src/main/resources/spec.json

Lines changed: 77 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,22 +9,93 @@
99
"title": "Databricks Destination Spec",
1010
"type": "object",
1111
"required": [
12-
"jdbcUrl"
12+
"serverHostname",
13+
"httpPath",
14+
"pat"
1315
],
1416
"additionalProperties": false,
1517
"properties": {
16-
"jdbcUrl": {
17-
"title": "JDBC URL",
18+
"serverHostname": {
19+
"title": "Server Hostname",
20+
"type": "string",
21+
"description": "",
22+
"examples": [""]
23+
},
24+
"httpPath": {
25+
"title": "HTTP Path",
26+
"type": "string",
27+
"description": "",
28+
"examples": [""]
29+
},
30+
"pat": {
31+
"title": "Personal Access Token",
1832
"type": "string",
1933
"description": "",
2034
"examples": [""],
2135
"airbyte_secret": true
2236
},
23-
"database": {
37+
"schema": {
2438
"title": "Database",
2539
"type": "string",
26-
"description": "",
27-
"examples": [""]
40+
"description": ""
41+
},
42+
"s3_bucket_name": {
43+
"title": "S3 Bucket Name",
44+
"type": "string",
45+
"description": "The name of the S3 bucket to use for intermittent staging of the data.",
46+
"examples": ["airbyte.staging"]
47+
},
48+
"s3_bucket_region": {
49+
"title": "S3 Bucket Region",
50+
"type": "string",
51+
"default": "",
52+
"description": "The region of the S3 staging bucket to use if utilising a copy strategy.",
53+
"enum": [
54+
"",
55+
"us-east-1",
56+
"us-east-2",
57+
"us-west-1",
58+
"us-west-2",
59+
"af-south-1",
60+
"ap-east-1",
61+
"ap-south-1",
62+
"ap-northeast-1",
63+
"ap-northeast-2",
64+
"ap-northeast-3",
65+
"ap-southeast-1",
66+
"ap-southeast-2",
67+
"ca-central-1",
68+
"cn-north-1",
69+
"cn-northwest-1",
70+
"eu-central-1",
71+
"eu-north-1",
72+
"eu-south-1",
73+
"eu-west-1",
74+
"eu-west-2",
75+
"eu-west-3",
76+
"sa-east-1",
77+
"me-south-1"
78+
]
79+
},
80+
"access_key_id": {
81+
"type": "string",
82+
"description": "The Access Key Id granting allow one to access the above S3 staging bucket. Airbyte requires Read and Write permissions to the given bucket.",
83+
"title": "S3 Key Id",
84+
"airbyte_secret": true
85+
},
86+
"secret_access_key": {
87+
"type": "string",
88+
"description": "The corresponding secret to the above access key id.",
89+
"title": "S3 Access Key",
90+
"airbyte_secret": true
91+
},
92+
"part_size": {
93+
"type": "integer",
94+
"minimum": 10,
95+
"maximum": 100,
96+
"examples": ["10"],
97+
"description": "Optional. Increase this if syncing tables larger than 100GB. Files are streamed to S3 in parts. This determines the size of each part, in MBs. As S3 has a limit of 10,000 parts per file, part size affects the table size. This is 10MB by default, resulting in a default limit of 100GB tables. Note, a larger part size will result in larger memory requirements. A rule of thumb is to multiply the part size by 10 to get the memory requirement. Modify this with care.",
98+
"title": "Stream Part Size"
2899
}
29100
}
30101
}

airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/parquet/S3ParquetWriter.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ public class S3ParquetWriter extends BaseS3Writer implements S3Writer {
5656

5757
private final ParquetWriter<Record> parquetWriter;
5858
private final AvroRecordFactory avroRecordFactory;
59+
public final Schema parquetSchema;
5960

6061
public S3ParquetWriter(S3DestinationConfig config,
6162
AmazonS3 s3Client,
@@ -88,6 +89,7 @@ public S3ParquetWriter(S3DestinationConfig config,
8889
.withDictionaryEncoding(formatConfig.isDictionaryEncoding())
8990
.build();
9091
this.avroRecordFactory = new AvroRecordFactory(schema, nameUpdater);
92+
this.parquetSchema = schema;
9193
}
9294

9395
public static Configuration getHadoopConfig(S3DestinationConfig config) {

0 commit comments

Comments
 (0)