Skip to content

Commit d7db844

Browse files
authored
Implement databricks destination as a stream copier (#5748)
1 parent af94c07 commit d7db844

File tree

11 files changed

+247
-117
lines changed

11 files changed

+247
-117
lines changed

airbyte-integrations/connectors/destination-databricks/build.gradle

+8
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,14 @@ dependencies {
1515
implementation project(':airbyte-integrations:bases:base-java')
1616
implementation files(project(':airbyte-integrations:bases:base-java').airbyteDocker.outputs)
1717
implementation project(':airbyte-integrations:connectors:destination-jdbc')
18+
implementation project(':airbyte-integrations:connectors:destination-s3')
19+
20+
// parquet
21+
implementation group: 'org.apache.hadoop', name: 'hadoop-common', version: '3.3.0'
22+
implementation group: 'org.apache.hadoop', name: 'hadoop-aws', version: '3.3.0'
23+
implementation group: 'org.apache.hadoop', name: 'hadoop-mapreduce-client-core', version: '3.3.0'
24+
implementation group: 'org.apache.parquet', name: 'parquet-avro', version: '1.12.0'
25+
implementation group: 'tech.allegro.schema.json2avro', name: 'converter', version: '0.2.10'
1826

1927
integrationTestJavaImplementation project(':airbyte-integrations:bases:standard-destination-test')
2028
integrationTestJavaImplementation project(':airbyte-integrations:connectors:destination-databricks')

airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksDestination.java

+40-73
Original file line numberDiff line numberDiff line change
@@ -25,93 +25,60 @@
2525
package io.airbyte.integrations.destination.databricks;
2626

2727
import com.fasterxml.jackson.databind.JsonNode;
28-
import com.google.common.collect.ImmutableMap;
29-
import io.airbyte.commons.json.Jsons;
28+
import io.airbyte.db.Databases;
3029
import io.airbyte.db.jdbc.JdbcDatabase;
31-
import io.airbyte.integrations.base.Destination;
32-
import io.airbyte.integrations.base.IntegrationRunner;
33-
import io.airbyte.integrations.destination.jdbc.AbstractJdbcDestination;
34-
import io.airbyte.protocol.models.AirbyteConnectionStatus;
35-
import io.airbyte.protocol.models.AirbyteConnectionStatus.Status;
36-
import java.io.File;
37-
import java.net.MalformedURLException;
38-
import java.net.URL;
39-
import java.net.URLClassLoader;
40-
import java.util.Optional;
41-
import org.slf4j.Logger;
42-
import org.slf4j.LoggerFactory;
30+
import io.airbyte.integrations.base.AirbyteMessageConsumer;
31+
import io.airbyte.integrations.destination.ExtendedNameTransformer;
32+
import io.airbyte.integrations.destination.jdbc.SqlOperations;
33+
import io.airbyte.integrations.destination.jdbc.copy.CopyConsumerFactory;
34+
import io.airbyte.integrations.destination.jdbc.copy.CopyDestination;
35+
import io.airbyte.integrations.destination.jdbc.copy.s3.S3Config;
36+
import io.airbyte.integrations.destination.jdbc.copy.s3.S3StreamCopier;
37+
import io.airbyte.protocol.models.AirbyteMessage;
38+
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
39+
import java.util.function.Consumer;
4340

44-
public class DatabricksDestination extends AbstractJdbcDestination implements Destination {
41+
public class DatabricksDestination extends CopyDestination {
4542

46-
private static final Logger LOGGER = LoggerFactory.getLogger(DatabricksDestination.class);
47-
48-
public static final String DRIVER_CLASS = "com.simba.spark.jdbc.Driver";
49-
50-
// TODO: this isn't working yet!
51-
public static void getDriver() throws MalformedURLException, ClassNotFoundException {
52-
File driverJar = new File("/Users/phlair/Downloads/SparkDriver/SparkJDBC42.jar");
53-
URL jarUrl = new URL("jar", "", "file:" + driverJar.getAbsolutePath() + "!/");
54-
URLClassLoader myLoader = new URLClassLoader(new URL[] { jarUrl } );
55-
myLoader.loadClass(DRIVER_CLASS);
56-
}
43+
private static final String DRIVER_CLASS = "com.simba.spark.jdbc.Driver";
5744

5845
@Override
59-
public AirbyteConnectionStatus check(JsonNode config) {
60-
61-
try (final JdbcDatabase database = getDatabase(config)) {
62-
DatabricksSqlOperations databricksSqlOperations = (DatabricksSqlOperations) getSqlOperations();
63-
64-
String outputSchema = getNamingResolver().getIdentifier(config.get("database").asText());
65-
attemptSQLCreateAndDropTableOperations(outputSchema, database, getNamingResolver(), databricksSqlOperations);
66-
67-
databricksSqlOperations.verifyLocalFileEnabled(database);
68-
69-
// TODO: enforce databricks runtime version instead of this mySql code
70-
// VersionCompatibility compatibility = dbSqlOperations.isCompatibleVersion(database);
71-
// if (!compatibility.isCompatible()) {
72-
// throw new RuntimeException(String
73-
// .format("Your MySQL version %s is not compatible with Airbyte",
74-
// compatibility.getVersion()));
75-
// }
76-
77-
return new AirbyteConnectionStatus().withStatus(Status.SUCCEEDED);
78-
} catch (Exception e) {
79-
LOGGER.error("Exception while checking connection: ", e);
80-
return new AirbyteConnectionStatus()
81-
.withStatus(Status.FAILED)
82-
.withMessage("Could not connect with provided configuration. \n" + e.getMessage());
83-
}
46+
public AirbyteMessageConsumer getConsumer(JsonNode config, ConfiguredAirbyteCatalog catalog, Consumer<AirbyteMessage> outputRecordCollector) {
47+
return CopyConsumerFactory.create(
48+
outputRecordCollector,
49+
getDatabase(config),
50+
getSqlOperations(),
51+
getNameTransformer(),
52+
S3Config.getS3Config(config),
53+
catalog,
54+
new DatabricksStreamCopierFactory(),
55+
config.get("schema").asText()
56+
);
8457
}
8558

86-
public DatabricksDestination() {
87-
super(DRIVER_CLASS, new DatabricksNameTransformer(), new DatabricksSqlOperations());
59+
@Override
60+
public void checkPersistence(JsonNode config) {
61+
S3StreamCopier.attemptS3WriteAndDelete(S3Config.getS3Config(config));
8862
}
8963

9064
@Override
91-
public JsonNode toJdbcConfig(JsonNode databricksConfig) {
92-
return getJdbcConfig(databricksConfig);
65+
public ExtendedNameTransformer getNameTransformer() {
66+
return new DatabricksNameTransformer();
9367
}
9468

95-
public static JsonNode getJdbcConfig(JsonNode databricksConfig) {
96-
final String schema = Optional.ofNullable(databricksConfig.get("schema")).map(JsonNode::asText).orElse("default");
97-
98-
return Jsons.jsonNode(ImmutableMap.builder()
99-
.put("username", "dummy")
100-
.put("password", "dummy")
101-
// .put("jdbc_url", String.format("jdbc:TODO://%s:%s/%s",
102-
// databricksConfig.get("host").asText(),
103-
// databricksConfig.get("port").asText(),
104-
// databricksConfig.get("database").asText()))
105-
// .put("schema", schema)
106-
.put("jdbc_url", databricksConfig.get("jdbcUrl").asText())
107-
.build());
69+
@Override
70+
public JdbcDatabase getDatabase(JsonNode databricksConfig) {
71+
return Databases.createJdbcDatabase(
72+
databricksConfig.get("username").asText(),
73+
databricksConfig.has("password") ? databricksConfig.get("password").asText() : null,
74+
databricksConfig.get("jdbc_url").asText(),
75+
DRIVER_CLASS
76+
);
10877
}
10978

110-
public static void main(String[] args) throws Exception {
111-
LOGGER.info("starting destination: {}", DatabricksDestination.class);
112-
getDriver();
113-
new IntegrationRunner(new DatabricksDestination()).run(args);
114-
LOGGER.info("completed destination: {}", DatabricksDestination.class);
79+
@Override
80+
public SqlOperations getSqlOperations() {
81+
return new DatabricksSqlOperations();
11582
}
11683

11784
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
package io.airbyte.integrations.destination.databricks;
2+
3+
import com.amazonaws.services.s3.AmazonS3;
4+
import com.fasterxml.jackson.databind.ObjectMapper;
5+
import io.airbyte.db.jdbc.JdbcDatabase;
6+
import io.airbyte.integrations.destination.ExtendedNameTransformer;
7+
import io.airbyte.integrations.destination.jdbc.SqlOperations;
8+
import io.airbyte.integrations.destination.jdbc.copy.StreamCopier;
9+
import io.airbyte.integrations.destination.jdbc.copy.s3.S3Config;
10+
import io.airbyte.integrations.destination.s3.S3DestinationConfig;
11+
import io.airbyte.integrations.destination.s3.parquet.S3ParquetFormatConfig;
12+
import io.airbyte.integrations.destination.s3.parquet.S3ParquetWriter;
13+
import io.airbyte.integrations.destination.s3.writer.S3WriterFactory;
14+
import io.airbyte.protocol.models.AirbyteRecordMessage;
15+
import io.airbyte.protocol.models.AirbyteStream;
16+
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
17+
import java.sql.Timestamp;
18+
import java.util.UUID;
19+
import org.slf4j.Logger;
20+
import org.slf4j.LoggerFactory;
21+
22+
/**
23+
* This implementation is similar to {@link io.airbyte.integrations.destination.jdbc.copy.s3.S3StreamCopier}.
24+
* The difference is that this implementation creates Parquet staging files, instead of CSV ones.
25+
*/
26+
public class DatabricksStreamCopier implements StreamCopier {
27+
28+
private static final Logger LOGGER = LoggerFactory.getLogger(DatabricksStreamCopier.class);
29+
private static final ObjectMapper MAPPER = new ObjectMapper();
30+
31+
private final AmazonS3 s3Client;
32+
private final S3Config s3Config;
33+
private final String tmpTableName;
34+
private final AirbyteStream stream;
35+
private final JdbcDatabase db;
36+
private final ExtendedNameTransformer nameTransformer;
37+
private final SqlOperations sqlOperations;
38+
private final S3ParquetWriter parquetWriter;
39+
40+
public DatabricksStreamCopier(String stagingFolder,
41+
String schema,
42+
ConfiguredAirbyteStream configuredStream,
43+
AmazonS3 s3Client,
44+
JdbcDatabase db,
45+
S3Config s3Config,
46+
ExtendedNameTransformer nameTransformer,
47+
SqlOperations sqlOperations,
48+
S3WriterFactory writerFactory,
49+
Timestamp uploadTime) throws Exception {
50+
this.stream = configuredStream.getStream();
51+
this.db = db;
52+
this.nameTransformer = nameTransformer;
53+
this.sqlOperations = sqlOperations;
54+
this.tmpTableName = nameTransformer.getTmpTableName(stream.getName());
55+
this.s3Client = s3Client;
56+
this.s3Config = s3Config;
57+
this.parquetWriter = (S3ParquetWriter) writerFactory
58+
.create(getS3DestinationConfig(s3Config, stagingFolder), s3Client, configuredStream, uploadTime);
59+
}
60+
61+
@Override
62+
public void write(UUID id, AirbyteRecordMessage recordMessage) throws Exception {
63+
parquetWriter.write(id, recordMessage);
64+
}
65+
66+
@Override
67+
public void closeStagingUploader(boolean hasFailed) throws Exception {
68+
parquetWriter.close(hasFailed);
69+
}
70+
71+
@Override
72+
public void createTemporaryTable() throws Exception {
73+
74+
}
75+
76+
@Override
77+
public void copyStagingFileToTemporaryTable() throws Exception {
78+
79+
}
80+
81+
@Override
82+
public void createDestinationSchema() throws Exception {
83+
84+
}
85+
86+
@Override
87+
public String createDestinationTable() throws Exception {
88+
return null;
89+
}
90+
91+
@Override
92+
public String generateMergeStatement(String destTableName) throws Exception {
93+
return null;
94+
}
95+
96+
@Override
97+
public void removeFileAndDropTmpTable() throws Exception {
98+
99+
}
100+
101+
private S3DestinationConfig getS3DestinationConfig(S3Config s3Config, String stagingFolder) {
102+
return new S3DestinationConfig(
103+
s3Config.getEndpoint(),
104+
s3Config.getBucketName(),
105+
stagingFolder,
106+
s3Config.getRegion(),
107+
s3Config.getAccessKeyId(),
108+
s3Config.getSecretAccessKey(),
109+
// use default parquet format config
110+
new S3ParquetFormatConfig(MAPPER.createObjectNode())
111+
);
112+
}
113+
114+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
package io.airbyte.integrations.destination.databricks;
2+
3+
import com.amazonaws.services.s3.AmazonS3;
4+
import io.airbyte.db.jdbc.JdbcDatabase;
5+
import io.airbyte.integrations.destination.ExtendedNameTransformer;
6+
import io.airbyte.integrations.destination.jdbc.SqlOperations;
7+
import io.airbyte.integrations.destination.jdbc.copy.StreamCopier;
8+
import io.airbyte.integrations.destination.jdbc.copy.StreamCopierFactory;
9+
import io.airbyte.integrations.destination.jdbc.copy.s3.S3Config;
10+
import io.airbyte.integrations.destination.jdbc.copy.s3.S3StreamCopier;
11+
import io.airbyte.integrations.destination.s3.parquet.S3ParquetWriter;
12+
import io.airbyte.integrations.destination.s3.writer.ProductionWriterFactory;
13+
import io.airbyte.integrations.destination.s3.writer.S3WriterFactory;
14+
import io.airbyte.protocol.models.AirbyteStream;
15+
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
16+
import java.sql.Timestamp;
17+
18+
public class DatabricksStreamCopierFactory implements StreamCopierFactory<S3Config> {
19+
20+
@Override
21+
public StreamCopier create(String configuredSchema,
22+
S3Config s3Config,
23+
String stagingFolder,
24+
ConfiguredAirbyteStream configuredStream,
25+
ExtendedNameTransformer nameTransformer,
26+
JdbcDatabase db,
27+
SqlOperations sqlOperations) {
28+
try {
29+
AirbyteStream stream = configuredStream.getStream();
30+
String schema = StreamCopierFactory.getSchema(stream, configuredSchema, nameTransformer);
31+
AmazonS3 s3Client = S3StreamCopier.getAmazonS3(s3Config);
32+
S3WriterFactory writerFactory = new ProductionWriterFactory();
33+
Timestamp uploadTimestamp = new Timestamp(System.currentTimeMillis());
34+
35+
return new DatabricksStreamCopier(
36+
stagingFolder, schema, configuredStream, s3Client, db, s3Config, nameTransformer, sqlOperations, writerFactory, uploadTimestamp);
37+
} catch (Exception e) {
38+
throw new RuntimeException(e);
39+
}
40+
41+
}
42+
43+
}

airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/CopyConsumerFactory.java

+2-7
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424

2525
package io.airbyte.integrations.destination.jdbc.copy;
2626

27-
import io.airbyte.commons.json.Jsons;
2827
import io.airbyte.db.jdbc.JdbcDatabase;
2928
import io.airbyte.integrations.base.AirbyteMessageConsumer;
3029
import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair;
@@ -37,8 +36,6 @@
3736
import io.airbyte.protocol.models.AirbyteMessage;
3837
import io.airbyte.protocol.models.AirbyteRecordMessage;
3938
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
40-
import java.sql.Timestamp;
41-
import java.time.Instant;
4239
import java.util.ArrayList;
4340
import java.util.HashMap;
4441
import java.util.List;
@@ -94,8 +91,7 @@ private static <T> Map<AirbyteStreamNameNamespacePair, StreamCopier> createWrite
9491
for (var configuredStream : catalog.getStreams()) {
9592
var stream = configuredStream.getStream();
9693
var pair = AirbyteStreamNameNamespacePair.fromAirbyteSteam(stream);
97-
var syncMode = configuredStream.getDestinationSyncMode();
98-
var copier = streamCopierFactory.create(defaultSchema, config, stagingFolder, syncMode, stream, namingResolver, database, sqlOperations);
94+
var copier = streamCopierFactory.create(defaultSchema, config, stagingFolder, configuredStream, namingResolver, database, sqlOperations);
9995

10096
pairToCopier.put(pair, copier);
10197
}
@@ -116,8 +112,7 @@ private static RecordWriter recordWriterFunction(Map<AirbyteStreamNameNamespaceP
116112
if (sqlOperations.isValidData(recordMessage.getData())) {
117113
// TODO Truncate json data instead of throwing whole record away?
118114
// or should we upload it into a special rejected record folder in s3 instead?
119-
var emittedAt = Timestamp.from(Instant.ofEpochMilli(recordMessage.getEmittedAt()));
120-
pairToCopier.get(pair).write(id, Jsons.serialize(recordMessage.getData()), emittedAt);
115+
pairToCopier.get(pair).write(id, recordMessage);
121116
} else {
122117
pairToIgnoredRecordCount.put(pair, pairToIgnoredRecordCount.getOrDefault(pair, 0L) + 1L);
123118
}

airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/StreamCopier.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424

2525
package io.airbyte.integrations.destination.jdbc.copy;
2626

27+
import io.airbyte.protocol.models.AirbyteRecordMessage;
2728
import java.sql.Timestamp;
2829
import java.util.UUID;
2930

@@ -36,7 +37,7 @@ public interface StreamCopier {
3637
/**
3738
* Writes a value to a staging file for the stream.
3839
*/
39-
void write(UUID id, String jsonDataString, Timestamp emittedAt) throws Exception;
40+
void write(UUID id, AirbyteRecordMessage recordMessage) throws Exception;
4041

4142
/**
4243
* Closes the writer for the stream to the staging persistence. This method should block until all

airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/StreamCopierFactory.java

+10-2
Original file line numberDiff line numberDiff line change
@@ -28,17 +28,25 @@
2828
import io.airbyte.integrations.destination.ExtendedNameTransformer;
2929
import io.airbyte.integrations.destination.jdbc.SqlOperations;
3030
import io.airbyte.protocol.models.AirbyteStream;
31+
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
3132
import io.airbyte.protocol.models.DestinationSyncMode;
3233

3334
public interface StreamCopierFactory<T> {
3435

3536
StreamCopier create(String configuredSchema,
3637
T config,
3738
String stagingFolder,
38-
DestinationSyncMode syncMode,
39-
AirbyteStream stream,
39+
ConfiguredAirbyteStream configuredStream,
4040
ExtendedNameTransformer nameTransformer,
4141
JdbcDatabase db,
4242
SqlOperations sqlOperations);
4343

44+
static String getSchema(AirbyteStream stream, String configuredSchema, ExtendedNameTransformer nameTransformer) {
45+
if (stream.getNamespace() != null) {
46+
return nameTransformer.convertStreamName(stream.getNamespace());
47+
} else {
48+
return nameTransformer.convertStreamName(configuredSchema);
49+
}
50+
}
51+
4452
}

0 commit comments

Comments
 (0)