Skip to content

Implement databricks connector as a copy destination #5748

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Aug 31, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,14 @@ dependencies {
implementation project(':airbyte-integrations:bases:base-java')
implementation files(project(':airbyte-integrations:bases:base-java').airbyteDocker.outputs)
implementation project(':airbyte-integrations:connectors:destination-jdbc')
implementation project(':airbyte-integrations:connectors:destination-s3')
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's depend on the s3 destination for now and DRY it later.


// parquet
implementation group: 'org.apache.hadoop', name: 'hadoop-common', version: '3.3.0'
implementation group: 'org.apache.hadoop', name: 'hadoop-aws', version: '3.3.0'
implementation group: 'org.apache.hadoop', name: 'hadoop-mapreduce-client-core', version: '3.3.0'
implementation group: 'org.apache.parquet', name: 'parquet-avro', version: '1.12.0'
implementation group: 'tech.allegro.schema.json2avro', name: 'converter', version: '0.2.10'

integrationTestJavaImplementation project(':airbyte-integrations:bases:standard-destination-test')
integrationTestJavaImplementation project(':airbyte-integrations:connectors:destination-databricks')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,93 +25,60 @@
package io.airbyte.integrations.destination.databricks;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.json.Jsons;
import io.airbyte.db.Databases;
import io.airbyte.db.jdbc.JdbcDatabase;
import io.airbyte.integrations.base.Destination;
import io.airbyte.integrations.base.IntegrationRunner;
import io.airbyte.integrations.destination.jdbc.AbstractJdbcDestination;
import io.airbyte.protocol.models.AirbyteConnectionStatus;
import io.airbyte.protocol.models.AirbyteConnectionStatus.Status;
import java.io.File;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.Optional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.airbyte.integrations.base.AirbyteMessageConsumer;
import io.airbyte.integrations.destination.ExtendedNameTransformer;
import io.airbyte.integrations.destination.jdbc.SqlOperations;
import io.airbyte.integrations.destination.jdbc.copy.CopyConsumerFactory;
import io.airbyte.integrations.destination.jdbc.copy.CopyDestination;
import io.airbyte.integrations.destination.jdbc.copy.s3.S3Config;
import io.airbyte.integrations.destination.jdbc.copy.s3.S3StreamCopier;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import java.util.function.Consumer;

public class DatabricksDestination extends AbstractJdbcDestination implements Destination {
public class DatabricksDestination extends CopyDestination {

private static final Logger LOGGER = LoggerFactory.getLogger(DatabricksDestination.class);

public static final String DRIVER_CLASS = "com.simba.spark.jdbc.Driver";

// TODO: this isn't working yet!
public static void getDriver() throws MalformedURLException, ClassNotFoundException {
File driverJar = new File("/Users/phlair/Downloads/SparkDriver/SparkJDBC42.jar");
URL jarUrl = new URL("jar", "", "file:" + driverJar.getAbsolutePath() + "!/");
URLClassLoader myLoader = new URLClassLoader(new URL[] { jarUrl } );
myLoader.loadClass(DRIVER_CLASS);
}
private static final String DRIVER_CLASS = "com.simba.spark.jdbc.Driver";

@Override
public AirbyteConnectionStatus check(JsonNode config) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The CopyDestination can check or create the SQL tables by calling the abstract method. So we only need to check the persistence in this implementation.


try (final JdbcDatabase database = getDatabase(config)) {
DatabricksSqlOperations databricksSqlOperations = (DatabricksSqlOperations) getSqlOperations();

String outputSchema = getNamingResolver().getIdentifier(config.get("database").asText());
attemptSQLCreateAndDropTableOperations(outputSchema, database, getNamingResolver(), databricksSqlOperations);

databricksSqlOperations.verifyLocalFileEnabled(database);

// TODO: enforce databricks runtime version instead of this mySql code
// VersionCompatibility compatibility = dbSqlOperations.isCompatibleVersion(database);
// if (!compatibility.isCompatible()) {
// throw new RuntimeException(String
// .format("Your MySQL version %s is not compatible with Airbyte",
// compatibility.getVersion()));
// }

return new AirbyteConnectionStatus().withStatus(Status.SUCCEEDED);
} catch (Exception e) {
LOGGER.error("Exception while checking connection: ", e);
return new AirbyteConnectionStatus()
.withStatus(Status.FAILED)
.withMessage("Could not connect with provided configuration. \n" + e.getMessage());
}
public AirbyteMessageConsumer getConsumer(JsonNode config, ConfiguredAirbyteCatalog catalog, Consumer<AirbyteMessage> outputRecordCollector) {
return CopyConsumerFactory.create(
outputRecordCollector,
getDatabase(config),
getSqlOperations(),
getNameTransformer(),
S3Config.getS3Config(config),
catalog,
new DatabricksStreamCopierFactory(),
config.get("schema").asText()
);
}

public DatabricksDestination() {
super(DRIVER_CLASS, new DatabricksNameTransformer(), new DatabricksSqlOperations());
@Override
public void checkPersistence(JsonNode config) {
S3StreamCopier.attemptS3WriteAndDelete(S3Config.getS3Config(config));
}

@Override
public JsonNode toJdbcConfig(JsonNode databricksConfig) {
return getJdbcConfig(databricksConfig);
public ExtendedNameTransformer getNameTransformer() {
return new DatabricksNameTransformer();
}

public static JsonNode getJdbcConfig(JsonNode databricksConfig) {
final String schema = Optional.ofNullable(databricksConfig.get("schema")).map(JsonNode::asText).orElse("default");

return Jsons.jsonNode(ImmutableMap.builder()
.put("username", "dummy")
.put("password", "dummy")
// .put("jdbc_url", String.format("jdbc:TODO://%s:%s/%s",
// databricksConfig.get("host").asText(),
// databricksConfig.get("port").asText(),
// databricksConfig.get("database").asText()))
// .put("schema", schema)
.put("jdbc_url", databricksConfig.get("jdbcUrl").asText())
.build());
@Override
public JdbcDatabase getDatabase(JsonNode databricksConfig) {
return Databases.createJdbcDatabase(
databricksConfig.get("username").asText(),
databricksConfig.has("password") ? databricksConfig.get("password").asText() : null,
databricksConfig.get("jdbc_url").asText(),
DRIVER_CLASS
);
}

public static void main(String[] args) throws Exception {
LOGGER.info("starting destination: {}", DatabricksDestination.class);
getDriver();
new IntegrationRunner(new DatabricksDestination()).run(args);
LOGGER.info("completed destination: {}", DatabricksDestination.class);
@Override
public SqlOperations getSqlOperations() {
return new DatabricksSqlOperations();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
package io.airbyte.integrations.destination.databricks;

import com.amazonaws.services.s3.AmazonS3;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.airbyte.db.jdbc.JdbcDatabase;
import io.airbyte.integrations.destination.ExtendedNameTransformer;
import io.airbyte.integrations.destination.jdbc.SqlOperations;
import io.airbyte.integrations.destination.jdbc.copy.StreamCopier;
import io.airbyte.integrations.destination.jdbc.copy.s3.S3Config;
import io.airbyte.integrations.destination.s3.S3DestinationConfig;
import io.airbyte.integrations.destination.s3.parquet.S3ParquetFormatConfig;
import io.airbyte.integrations.destination.s3.parquet.S3ParquetWriter;
import io.airbyte.integrations.destination.s3.writer.S3WriterFactory;
import io.airbyte.protocol.models.AirbyteRecordMessage;
import io.airbyte.protocol.models.AirbyteStream;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import java.sql.Timestamp;
import java.util.UUID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* This implementation is similar to {@link io.airbyte.integrations.destination.jdbc.copy.s3.S3StreamCopier}.
* The difference is that this implementation creates Parquet staging files, instead of CSV ones.
*/
public class DatabricksStreamCopier implements StreamCopier {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can focus on implementing the methods in this class to transfer the data from S3 to Databricks.


private static final Logger LOGGER = LoggerFactory.getLogger(DatabricksStreamCopier.class);
private static final ObjectMapper MAPPER = new ObjectMapper();

private final AmazonS3 s3Client;
private final S3Config s3Config;
private final String tmpTableName;
private final AirbyteStream stream;
private final JdbcDatabase db;
private final ExtendedNameTransformer nameTransformer;
private final SqlOperations sqlOperations;
private final S3ParquetWriter parquetWriter;

public DatabricksStreamCopier(String stagingFolder,
String schema,
ConfiguredAirbyteStream configuredStream,
AmazonS3 s3Client,
JdbcDatabase db,
S3Config s3Config,
ExtendedNameTransformer nameTransformer,
SqlOperations sqlOperations,
S3WriterFactory writerFactory,
Timestamp uploadTime) throws Exception {
this.stream = configuredStream.getStream();
this.db = db;
this.nameTransformer = nameTransformer;
this.sqlOperations = sqlOperations;
this.tmpTableName = nameTransformer.getTmpTableName(stream.getName());
this.s3Client = s3Client;
this.s3Config = s3Config;
this.parquetWriter = (S3ParquetWriter) writerFactory
.create(getS3DestinationConfig(s3Config, stagingFolder), s3Client, configuredStream, uploadTime);
}

@Override
public void write(UUID id, AirbyteRecordMessage recordMessage) throws Exception {
parquetWriter.write(id, recordMessage);
}

@Override
public void closeStagingUploader(boolean hasFailed) throws Exception {
parquetWriter.close(hasFailed);
}

@Override
public void createTemporaryTable() throws Exception {

}

@Override
public void copyStagingFileToTemporaryTable() throws Exception {

}

@Override
public void createDestinationSchema() throws Exception {

}

@Override
public String createDestinationTable() throws Exception {
return null;
}

@Override
public String generateMergeStatement(String destTableName) throws Exception {
return null;
}

@Override
public void removeFileAndDropTmpTable() throws Exception {

}

private S3DestinationConfig getS3DestinationConfig(S3Config s3Config, String stagingFolder) {
return new S3DestinationConfig(
s3Config.getEndpoint(),
s3Config.getBucketName(),
stagingFolder,
s3Config.getRegion(),
s3Config.getAccessKeyId(),
s3Config.getSecretAccessKey(),
// use default parquet format config
new S3ParquetFormatConfig(MAPPER.createObjectNode())
);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package io.airbyte.integrations.destination.databricks;

import com.amazonaws.services.s3.AmazonS3;
import io.airbyte.db.jdbc.JdbcDatabase;
import io.airbyte.integrations.destination.ExtendedNameTransformer;
import io.airbyte.integrations.destination.jdbc.SqlOperations;
import io.airbyte.integrations.destination.jdbc.copy.StreamCopier;
import io.airbyte.integrations.destination.jdbc.copy.StreamCopierFactory;
import io.airbyte.integrations.destination.jdbc.copy.s3.S3Config;
import io.airbyte.integrations.destination.jdbc.copy.s3.S3StreamCopier;
import io.airbyte.integrations.destination.s3.parquet.S3ParquetWriter;
import io.airbyte.integrations.destination.s3.writer.ProductionWriterFactory;
import io.airbyte.integrations.destination.s3.writer.S3WriterFactory;
import io.airbyte.protocol.models.AirbyteStream;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import java.sql.Timestamp;

public class DatabricksStreamCopierFactory implements StreamCopierFactory<S3Config> {

@Override
public StreamCopier create(String configuredSchema,
S3Config s3Config,
String stagingFolder,
ConfiguredAirbyteStream configuredStream,
ExtendedNameTransformer nameTransformer,
JdbcDatabase db,
SqlOperations sqlOperations) {
try {
AirbyteStream stream = configuredStream.getStream();
String schema = StreamCopierFactory.getSchema(stream, configuredSchema, nameTransformer);
AmazonS3 s3Client = S3StreamCopier.getAmazonS3(s3Config);
S3WriterFactory writerFactory = new ProductionWriterFactory();
Timestamp uploadTimestamp = new Timestamp(System.currentTimeMillis());

return new DatabricksStreamCopier(
stagingFolder, schema, configuredStream, s3Client, db, s3Config, nameTransformer, sqlOperations, writerFactory, uploadTimestamp);
} catch (Exception e) {
throw new RuntimeException(e);
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@

package io.airbyte.integrations.destination.jdbc.copy;

import io.airbyte.commons.json.Jsons;
import io.airbyte.db.jdbc.JdbcDatabase;
import io.airbyte.integrations.base.AirbyteMessageConsumer;
import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair;
Expand All @@ -37,8 +36,6 @@
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteRecordMessage;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import java.sql.Timestamp;
import java.time.Instant;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -94,8 +91,7 @@ private static <T> Map<AirbyteStreamNameNamespacePair, StreamCopier> createWrite
for (var configuredStream : catalog.getStreams()) {
var stream = configuredStream.getStream();
var pair = AirbyteStreamNameNamespacePair.fromAirbyteSteam(stream);
var syncMode = configuredStream.getDestinationSyncMode();
var copier = streamCopierFactory.create(defaultSchema, config, stagingFolder, syncMode, stream, namingResolver, database, sqlOperations);
var copier = streamCopierFactory.create(defaultSchema, config, stagingFolder, configuredStream, namingResolver, database, sqlOperations);

pairToCopier.put(pair, copier);
}
Expand All @@ -116,8 +112,7 @@ private static RecordWriter recordWriterFunction(Map<AirbyteStreamNameNamespaceP
if (sqlOperations.isValidData(recordMessage.getData())) {
// TODO Truncate json data instead of throwing whole record away?
// or should we upload it into a special rejected record folder in s3 instead?
var emittedAt = Timestamp.from(Instant.ofEpochMilli(recordMessage.getEmittedAt()));
pairToCopier.get(pair).write(id, Jsons.serialize(recordMessage.getData()), emittedAt);
pairToCopier.get(pair).write(id, recordMessage);
} else {
pairToIgnoredRecordCount.put(pair, pairToIgnoredRecordCount.getOrDefault(pair, 0L) + 1L);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

package io.airbyte.integrations.destination.jdbc.copy;

import io.airbyte.protocol.models.AirbyteRecordMessage;
import java.sql.Timestamp;
import java.util.UUID;

Expand All @@ -36,7 +37,7 @@ public interface StreamCopier {
/**
* Writes a value to a staging file for the stream.
*/
void write(UUID id, String jsonDataString, Timestamp emittedAt) throws Exception;
void write(UUID id, AirbyteRecordMessage recordMessage) throws Exception;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This interface change is necessary to drop in the S3ParquetWriter directly for convenience.


/**
* Closes the writer for the stream to the staging persistence. This method should block until all
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,25 @@
import io.airbyte.integrations.destination.ExtendedNameTransformer;
import io.airbyte.integrations.destination.jdbc.SqlOperations;
import io.airbyte.protocol.models.AirbyteStream;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.DestinationSyncMode;

public interface StreamCopierFactory<T> {

StreamCopier create(String configuredSchema,
T config,
String stagingFolder,
DestinationSyncMode syncMode,
AirbyteStream stream,
ConfiguredAirbyteStream configuredStream,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This interface change is necessary to reuse the S3Writer.

ExtendedNameTransformer nameTransformer,
JdbcDatabase db,
SqlOperations sqlOperations);

static String getSchema(AirbyteStream stream, String configuredSchema, ExtendedNameTransformer nameTransformer) {
if (stream.getNamespace() != null) {
return nameTransformer.convertStreamName(stream.getNamespace());
} else {
return nameTransformer.convertStreamName(configuredSchema);
}
}

}
Loading