Skip to content

Commit e837048

Browse files
tulirenPhlairsherifnada
authored
🎉 New Destination: Databricks (#5998)
Implement new destination connector for databricks delta lake. Resolves #2075. Co-authored-by: George Claireaux <[email protected]> Co-authored-by: Sherif A. Nada <[email protected]>
1 parent 679ddf4 commit e837048

File tree

49 files changed

+1469
-179
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

49 files changed

+1469
-179
lines changed

.github/workflows/publish-command.yml

+1
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,7 @@ jobs:
178178
SOURCE_CLOSE_COM_CREDS: ${{ secrets.SOURCE_CLOSE_COM_CREDS }}
179179
SOURCE_BAMBOO_HR_CREDS: ${{ secrets.SOURCE_BAMBOO_HR_CREDS }}
180180
SOURCE_BIGCOMMERCE_CREDS: ${{ secrets.SOURCE_BIGCOMMERCE_CREDS }}
181+
DESTINATION_DATABRICKS_CREDS: ${{ secrets.DESTINATION_DATABRICKS_CREDS }}
181182
- run: |
182183
echo "$SPEC_CACHE_SERVICE_ACCOUNT_KEY" > spec_cache_key_file.json && docker login -u airbytebot -p ${DOCKER_PASSWORD}
183184
./tools/integrations/manage.sh publish airbyte-integrations/${{ github.event.inputs.connector }} ${{ github.event.inputs.run-tests }} --publish_spec_to_cache

.github/workflows/test-command.yml

+1
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,7 @@ jobs:
173173
SOURCE_CLOSE_COM_CREDS: ${{ secrets.SOURCE_CLOSE_COM_CREDS }}
174174
SOURCE_BAMBOO_HR_CREDS: ${{ secrets.SOURCE_BAMBOO_HR_CREDS }}
175175
SOURCE_BIGCOMMERCE_CREDS: ${{ secrets.SOURCE_BIGCOMMERCE_CREDS }}
176+
DESTINATION_DATABRICKS_CREDS: ${{ secrets.DESTINATION_DATABRICKS_CREDS }}
176177
- run: |
177178
./tools/bin/ci_integration_test.sh ${{ github.event.inputs.connector }}
178179
name: test ${{ github.event.inputs.connector }}

airbyte-integrations/builds.md

+1
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@
9090
| :--- | :--- |
9191
| Azure Blob Storage | [![destination-azure-blob-storage](https://img.shields.io/endpoint?url=https%3A%2F%2Fdnsgjos7lj2fu.cloudfront.net%2Ftests%2Fsummary%2Fdestination-azure-blob-storage%2Fbadge.json)](https://dnsgjos7lj2fu.cloudfront.net/tests/summary/destination-azure-blob-storage) |
9292
| BigQuery | [![destination-bigquery](https://img.shields.io/endpoint?url=https%3A%2F%2Fdnsgjos7lj2fu.cloudfront.net%2Ftests%2Fsummary%2Fdestination-bigquery%2Fbadge.json)](https://dnsgjos7lj2fu.cloudfront.net/tests/summary/destination-bigquery) |
93+
| Databricks | [![destination-bigquery](https://img.shields.io/endpoint?url=https%3A%2F%2Fdnsgjos7lj2fu.cloudfront.net%2Ftests%2Fsummary%2Fdestination-databricks%2Fbadge.json)](https://dnsgjos7lj2fu.cloudfront.net/tests/summary/destination-databricks) |
9394
| Google Cloud Storage (GCS) | [![destination-gcs](https://img.shields.io/endpoint?url=https%3A%2F%2Fdnsgjos7lj2fu.cloudfront.net%2Ftests%2Fsummary%2Fdestination-s3%2Fbadge.json)](https://dnsgjos7lj2fu.cloudfront.net/tests/summary/destination-gcs) |
9495
| Google PubSub | [![destination-pubsub](https://img.shields.io/endpoint?url=https%3A%2F%2Fdnsgjos7lj2fu.cloudfront.net%2Ftests%2Fsummary%2Fdestination-pubsub%2Fbadge.json)](https://dnsgjos7lj2fu.cloudfront.net/tests/summary/destination-pubsub) |
9596
| Kafka | [![destination-kafka](https://img.shields.io/endpoint?url=https%3A%2F%2Fdnsgjos7lj2fu.cloudfront.net%2Ftests%2Fsummary%2Fdestination-kafka%2Fbadge.json)](https://dnsgjos7lj2fu.cloudfront.net/tests/summary/destination-kafka) |
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
*
2+
!Dockerfile
3+
!build
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
# The driver is not checked into the source code due to legal reasons.
2+
# You can download the driver here:
3+
# https://databricks.com/spark/jdbc-drivers-download
4+
# By downloading this driver, you agree to the terms & conditions:
5+
# https://databricks.com/jdbc-odbc-driver-license
6+
lib/SparkJDBC42.jar
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
# Databricks Destination Connector Bootstrap
2+
3+
The Databricks Connector enables a developer to sync data into a Databricks cluster. It does so in two steps:
4+
5+
1. Persist source data in S3 staging files in the Parquet format.
6+
2. Create delta table based on the Parquet staging files.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
FROM airbyte/integration-base-java:dev
2+
3+
WORKDIR /airbyte
4+
ENV APPLICATION destination-databricks
5+
6+
COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar
7+
8+
RUN tar xf ${APPLICATION}.tar --strip-components=1
9+
10+
LABEL io.airbyte.version=0.1.0
11+
LABEL io.airbyte.name=airbyte/destination-databricks
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
# Destination Databricks
2+
3+
This is the repository for the Databricks destination connector in Java.
4+
For information about how to use this connector within Airbyte, see [the User Documentation](https://docs.airbyte.io/integrations/destinations/databricks).
5+
6+
## Databricks JDBC Driver
7+
This connector requires a JDBC driver to connect to Databricks cluster. The driver is developed by Simba. Before downloading and using this driver, you must agree to the [JDBC ODBC driver license](https://databricks.com/jdbc-odbc-driver-license). This means that you can only use this driver to connector third party applications to Apache Spark SQL within a Databricks offering using the ODBC and/or JDBC protocols. The driver can be downloaded from [here](https://databricks.com/spark/jdbc-drivers-download).
8+
9+
This is currently a private connector that is only available in Airbyte Cloud. To build and publish this connector, first download the driver and put it under the `lib` directory. Please do not publish this connector publicly. We are working on a solution to publicize it.
10+
11+
## Local development
12+
13+
#### Building via Gradle
14+
From the Airbyte repository root, run:
15+
```
16+
./gradlew :airbyte-integrations:connectors:destination-databricks:build
17+
```
18+
19+
#### Create credentials
20+
**If you are a community contributor**, you will need access to AWS S3 and Databricks cluster to run the integration tests:
21+
22+
- Create a Databricks cluster. See [documentation](https://docs.databricks.com/clusters/create.html).
23+
- Create an S3 bucket. See [documentation](https://docs.aws.amazon.com/general/latest/gr/aws-sec-cred-types.html#access-keys-and-secret-access-keys).
24+
- Grant the Databricks cluster full access to the S3 bucket. Or mount it as Databricks File System (DBFS). See [documentation](https://docs.databricks.com/data/data-sources/aws/amazon-s3.html).
25+
- Place both Databricks and S3 credentials in `sample_secrets/config.json`, which conforms to the spec file in `src/main/resources/spec.json`.
26+
- Rename the directory from `sample_secrets` to `secrets`.
27+
- Note that the `secrets` directory is git-ignored by default, so there is no danger of accidentally checking in sensitive information.
28+
29+
**If you are an Airbyte core member**:
30+
31+
- Get the `destination databricks creds` secrets on Last Pass, and put it in `sample_secrets/config.json`.
32+
- Rename the directory from `sample_secrets` to `secrets`.
33+
34+
### Locally running the connector docker image
35+
36+
#### Build
37+
Build the connector image via Gradle:
38+
```
39+
./gradlew :airbyte-integrations:connectors:destination-databricks:airbyteDocker
40+
```
41+
When building via Gradle, the docker image name and tag, respectively, are the values of the `io.airbyte.name` and `io.airbyte.version` `LABEL`s in
42+
the Dockerfile.
43+
44+
#### Run
45+
Then run any of the connector commands as follows:
46+
```
47+
docker run --rm airbyte/destination-databricks:dev spec
48+
docker run --rm -v $(pwd)/secrets:/secrets airbyte/destination-databricks:dev check --config /secrets/config.json
49+
docker run --rm -v $(pwd)/secrets:/secrets airbyte/destination-databricks:dev discover --config /secrets/config.json
50+
docker run --rm -v $(pwd)/secrets:/secrets -v $(pwd)/integration_tests:/integration_tests airbyte/destination-databricks:dev read --config /secrets/config.json --catalog /integration_tests/configured_catalog.json
51+
```
52+
53+
## Testing
54+
We use `JUnit` for Java tests.
55+
56+
### Unit and Integration Tests
57+
Place unit tests under `src/test/io/airbyte/integrations/destinations/databricks`.
58+
59+
#### Acceptance Tests
60+
Airbyte has a standard test suite that all destination connectors must pass. Implement the `TODO`s in
61+
`src/test-integration/java/io/airbyte/integrations/destinations/databricksDestinationAcceptanceTest.java`.
62+
63+
### Using gradle to run tests
64+
All commands should be run from airbyte project root.
65+
To run unit tests:
66+
```
67+
./gradlew :airbyte-integrations:connectors:destination-databricks:unitTest
68+
```
69+
To run acceptance and custom integration tests:
70+
```
71+
./gradlew :airbyte-integrations:connectors:destination-databricks:integrationTest
72+
```
73+
74+
## Dependency Management
75+
76+
### Publishing a new version of the connector
77+
You've checked out the repo, implemented a million dollar feature, and you're ready to share your changes with the world. Now what?
78+
1. Make sure your changes are passing unit and integration tests.
79+
1. Bump the connector version in `Dockerfile` -- just increment the value of the `LABEL io.airbyte.version` appropriately (we use [SemVer](https://semver.org/)).
80+
1. Create a Pull Request.
81+
1. Pat yourself on the back for being an awesome contributor.
82+
1. Someone from Airbyte will take a look at your PR and iterate with you to merge it into master.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
plugins {
2+
id 'application'
3+
id 'airbyte-docker'
4+
id 'airbyte-integration-test-java'
5+
}
6+
7+
application {
8+
mainClass = 'io.airbyte.integrations.destination.databricks.DatabricksDestination'
9+
}
10+
11+
dependencies {
12+
implementation project(':airbyte-db:lib')
13+
implementation project(':airbyte-config:models')
14+
implementation project(':airbyte-protocol:models')
15+
implementation project(':airbyte-integrations:bases:base-java')
16+
implementation files(project(':airbyte-integrations:bases:base-java').airbyteDocker.outputs)
17+
implementation project(':airbyte-integrations:connectors:destination-jdbc')
18+
implementation project(':airbyte-integrations:connectors:destination-s3')
19+
// Spark JDBC is not checked into the repo for legal reason
20+
implementation files("lib/SparkJDBC42.jar")
21+
22+
// parquet
23+
implementation group: 'org.apache.hadoop', name: 'hadoop-common', version: '3.3.0'
24+
implementation group: 'org.apache.hadoop', name: 'hadoop-aws', version: '3.3.0'
25+
implementation group: 'org.apache.hadoop', name: 'hadoop-mapreduce-client-core', version: '3.3.0'
26+
implementation group: 'org.apache.parquet', name: 'parquet-avro', version: '1.12.0'
27+
implementation group: 'tech.allegro.schema.json2avro', name: 'converter', version: '0.2.10'
28+
29+
integrationTestJavaImplementation project(':airbyte-integrations:bases:standard-destination-test')
30+
integrationTestJavaImplementation project(':airbyte-integrations:connectors:destination-databricks')
31+
}

airbyte-integrations/connectors/destination-databricks/lib/.keep

Whitespace-only changes.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
{
2+
"databricks_server_hostname": "required",
3+
"databricks_http_path": "required",
4+
"databricks_port": "443",
5+
"databricks_personal_access_token": "required",
6+
"database_schema": "public",
7+
"data_source": {
8+
"data_source_type": "S3",
9+
"s3_bucket_name": "required",
10+
"s3_bucket_path":"required",
11+
"s3_bucket_region": "required",
12+
"s3_access_key_id": "required",
13+
"s3_secret_access_key": "required"
14+
}
15+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
package io.airbyte.integrations.destination.databricks;
2+
3+
import java.util.Set;
4+
5+
public class DatabricksConstants {
6+
7+
public static final String DATABRICKS_USERNAME = "token";
8+
public static final String DATABRICKS_DRIVER_CLASS = "com.simba.spark.jdbc.Driver";
9+
10+
public static final Set<String> DEFAULT_TBL_PROPERTIES = Set.of(
11+
"delta.autoOptimize.optimizeWrite = true",
12+
"delta.autoOptimize.autoCompact = true");
13+
14+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
/*
2+
* MIT License
3+
*
4+
* Copyright (c) 2020 Airbyte
5+
*
6+
* Permission is hereby granted, free of charge, to any person obtaining a copy
7+
* of this software and associated documentation files (the "Software"), to deal
8+
* in the Software without restriction, including without limitation the rights
9+
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10+
* copies of the Software, and to permit persons to whom the Software is
11+
* furnished to do so, subject to the following conditions:
12+
*
13+
* The above copyright notice and this permission notice shall be included in all
14+
* copies or substantial portions of the Software.
15+
*
16+
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17+
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18+
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19+
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20+
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21+
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
22+
* SOFTWARE.
23+
*/
24+
25+
package io.airbyte.integrations.destination.databricks;
26+
27+
import com.fasterxml.jackson.databind.JsonNode;
28+
import io.airbyte.db.Databases;
29+
import io.airbyte.db.jdbc.JdbcDatabase;
30+
import io.airbyte.integrations.base.AirbyteMessageConsumer;
31+
import io.airbyte.integrations.base.IntegrationRunner;
32+
import io.airbyte.integrations.destination.ExtendedNameTransformer;
33+
import io.airbyte.integrations.destination.jdbc.SqlOperations;
34+
import io.airbyte.integrations.destination.jdbc.copy.CopyConsumerFactory;
35+
import io.airbyte.integrations.destination.jdbc.copy.CopyDestination;
36+
import io.airbyte.integrations.destination.jdbc.copy.s3.S3StreamCopier;
37+
import io.airbyte.protocol.models.AirbyteMessage;
38+
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
39+
import java.util.function.Consumer;
40+
41+
public class DatabricksDestination extends CopyDestination {
42+
43+
public DatabricksDestination() {
44+
super("database_schema");
45+
}
46+
47+
public static void main(String[] args) throws Exception {
48+
new IntegrationRunner(new DatabricksDestination()).run(args);
49+
}
50+
51+
@Override
52+
public AirbyteMessageConsumer getConsumer(JsonNode config, ConfiguredAirbyteCatalog catalog, Consumer<AirbyteMessage> outputRecordCollector) {
53+
DatabricksDestinationConfig databricksConfig = DatabricksDestinationConfig.get(config);
54+
return CopyConsumerFactory.create(
55+
outputRecordCollector,
56+
getDatabase(config),
57+
getSqlOperations(),
58+
getNameTransformer(),
59+
databricksConfig,
60+
catalog,
61+
new DatabricksStreamCopierFactory(),
62+
databricksConfig.getDatabaseSchema());
63+
}
64+
65+
@Override
66+
public void checkPersistence(JsonNode config) {
67+
DatabricksDestinationConfig databricksConfig = DatabricksDestinationConfig.get(config);
68+
S3StreamCopier.attemptS3WriteAndDelete(databricksConfig.getS3DestinationConfig().getS3Config());
69+
}
70+
71+
@Override
72+
public ExtendedNameTransformer getNameTransformer() {
73+
return new DatabricksNameTransformer();
74+
}
75+
76+
@Override
77+
public JdbcDatabase getDatabase(JsonNode jsonConfig) {
78+
return getDatabase(DatabricksDestinationConfig.get(jsonConfig));
79+
}
80+
81+
@Override
82+
public SqlOperations getSqlOperations() {
83+
return new DatabricksSqlOperations();
84+
}
85+
86+
static String getDatabricksConnectionString(DatabricksDestinationConfig databricksConfig) {
87+
return String.format("jdbc:spark://%s:%s/default;transportMode=http;ssl=1;httpPath=%s;UserAgentEntry=Airbyte",
88+
databricksConfig.getDatabricksServerHostname(),
89+
databricksConfig.getDatabricksPort(),
90+
databricksConfig.getDatabricksHttpPath());
91+
}
92+
93+
static JdbcDatabase getDatabase(DatabricksDestinationConfig databricksConfig) {
94+
return Databases.createJdbcDatabase(
95+
DatabricksConstants.DATABRICKS_USERNAME,
96+
databricksConfig.getDatabricksPersonalAccessToken(),
97+
getDatabricksConnectionString(databricksConfig),
98+
DatabricksConstants.DATABRICKS_DRIVER_CLASS);
99+
}
100+
101+
}

0 commit comments

Comments
 (0)