-
Notifications
You must be signed in to change notification settings - Fork 4.5k
Implement databricks connector as a copy destination #5748
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -25,93 +25,60 @@ | |
package io.airbyte.integrations.destination.databricks; | ||
|
||
import com.fasterxml.jackson.databind.JsonNode; | ||
import com.google.common.collect.ImmutableMap; | ||
import io.airbyte.commons.json.Jsons; | ||
import io.airbyte.db.Databases; | ||
import io.airbyte.db.jdbc.JdbcDatabase; | ||
import io.airbyte.integrations.base.Destination; | ||
import io.airbyte.integrations.base.IntegrationRunner; | ||
import io.airbyte.integrations.destination.jdbc.AbstractJdbcDestination; | ||
import io.airbyte.protocol.models.AirbyteConnectionStatus; | ||
import io.airbyte.protocol.models.AirbyteConnectionStatus.Status; | ||
import java.io.File; | ||
import java.net.MalformedURLException; | ||
import java.net.URL; | ||
import java.net.URLClassLoader; | ||
import java.util.Optional; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
import io.airbyte.integrations.base.AirbyteMessageConsumer; | ||
import io.airbyte.integrations.destination.ExtendedNameTransformer; | ||
import io.airbyte.integrations.destination.jdbc.SqlOperations; | ||
import io.airbyte.integrations.destination.jdbc.copy.CopyConsumerFactory; | ||
import io.airbyte.integrations.destination.jdbc.copy.CopyDestination; | ||
import io.airbyte.integrations.destination.jdbc.copy.s3.S3Config; | ||
import io.airbyte.integrations.destination.jdbc.copy.s3.S3StreamCopier; | ||
import io.airbyte.protocol.models.AirbyteMessage; | ||
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; | ||
import java.util.function.Consumer; | ||
|
||
public class DatabricksDestination extends AbstractJdbcDestination implements Destination { | ||
public class DatabricksDestination extends CopyDestination { | ||
|
||
private static final Logger LOGGER = LoggerFactory.getLogger(DatabricksDestination.class); | ||
|
||
public static final String DRIVER_CLASS = "com.simba.spark.jdbc.Driver"; | ||
|
||
// TODO: this isn't working yet! | ||
public static void getDriver() throws MalformedURLException, ClassNotFoundException { | ||
File driverJar = new File("/Users/phlair/Downloads/SparkDriver/SparkJDBC42.jar"); | ||
URL jarUrl = new URL("jar", "", "file:" + driverJar.getAbsolutePath() + "!/"); | ||
URLClassLoader myLoader = new URLClassLoader(new URL[] { jarUrl } ); | ||
myLoader.loadClass(DRIVER_CLASS); | ||
} | ||
private static final String DRIVER_CLASS = "com.simba.spark.jdbc.Driver"; | ||
|
||
@Override | ||
public AirbyteConnectionStatus check(JsonNode config) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The |
||
|
||
try (final JdbcDatabase database = getDatabase(config)) { | ||
DatabricksSqlOperations databricksSqlOperations = (DatabricksSqlOperations) getSqlOperations(); | ||
|
||
String outputSchema = getNamingResolver().getIdentifier(config.get("database").asText()); | ||
attemptSQLCreateAndDropTableOperations(outputSchema, database, getNamingResolver(), databricksSqlOperations); | ||
|
||
databricksSqlOperations.verifyLocalFileEnabled(database); | ||
|
||
// TODO: enforce databricks runtime version instead of this mySql code | ||
// VersionCompatibility compatibility = dbSqlOperations.isCompatibleVersion(database); | ||
// if (!compatibility.isCompatible()) { | ||
// throw new RuntimeException(String | ||
// .format("Your MySQL version %s is not compatible with Airbyte", | ||
// compatibility.getVersion())); | ||
// } | ||
|
||
return new AirbyteConnectionStatus().withStatus(Status.SUCCEEDED); | ||
} catch (Exception e) { | ||
LOGGER.error("Exception while checking connection: ", e); | ||
return new AirbyteConnectionStatus() | ||
.withStatus(Status.FAILED) | ||
.withMessage("Could not connect with provided configuration. \n" + e.getMessage()); | ||
} | ||
public AirbyteMessageConsumer getConsumer(JsonNode config, ConfiguredAirbyteCatalog catalog, Consumer<AirbyteMessage> outputRecordCollector) { | ||
return CopyConsumerFactory.create( | ||
outputRecordCollector, | ||
getDatabase(config), | ||
getSqlOperations(), | ||
getNameTransformer(), | ||
S3Config.getS3Config(config), | ||
catalog, | ||
new DatabricksStreamCopierFactory(), | ||
config.get("schema").asText() | ||
); | ||
} | ||
|
||
public DatabricksDestination() { | ||
super(DRIVER_CLASS, new DatabricksNameTransformer(), new DatabricksSqlOperations()); | ||
@Override | ||
public void checkPersistence(JsonNode config) { | ||
S3StreamCopier.attemptS3WriteAndDelete(S3Config.getS3Config(config)); | ||
} | ||
|
||
@Override | ||
public JsonNode toJdbcConfig(JsonNode databricksConfig) { | ||
return getJdbcConfig(databricksConfig); | ||
public ExtendedNameTransformer getNameTransformer() { | ||
return new DatabricksNameTransformer(); | ||
} | ||
|
||
public static JsonNode getJdbcConfig(JsonNode databricksConfig) { | ||
final String schema = Optional.ofNullable(databricksConfig.get("schema")).map(JsonNode::asText).orElse("default"); | ||
|
||
return Jsons.jsonNode(ImmutableMap.builder() | ||
.put("username", "dummy") | ||
.put("password", "dummy") | ||
// .put("jdbc_url", String.format("jdbc:TODO://%s:%s/%s", | ||
// databricksConfig.get("host").asText(), | ||
// databricksConfig.get("port").asText(), | ||
// databricksConfig.get("database").asText())) | ||
// .put("schema", schema) | ||
.put("jdbc_url", databricksConfig.get("jdbcUrl").asText()) | ||
.build()); | ||
@Override | ||
public JdbcDatabase getDatabase(JsonNode databricksConfig) { | ||
return Databases.createJdbcDatabase( | ||
databricksConfig.get("username").asText(), | ||
databricksConfig.has("password") ? databricksConfig.get("password").asText() : null, | ||
databricksConfig.get("jdbc_url").asText(), | ||
DRIVER_CLASS | ||
); | ||
} | ||
|
||
public static void main(String[] args) throws Exception { | ||
LOGGER.info("starting destination: {}", DatabricksDestination.class); | ||
getDriver(); | ||
new IntegrationRunner(new DatabricksDestination()).run(args); | ||
LOGGER.info("completed destination: {}", DatabricksDestination.class); | ||
@Override | ||
public SqlOperations getSqlOperations() { | ||
return new DatabricksSqlOperations(); | ||
} | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,114 @@ | ||
package io.airbyte.integrations.destination.databricks; | ||
|
||
import com.amazonaws.services.s3.AmazonS3; | ||
import com.fasterxml.jackson.databind.ObjectMapper; | ||
import io.airbyte.db.jdbc.JdbcDatabase; | ||
import io.airbyte.integrations.destination.ExtendedNameTransformer; | ||
import io.airbyte.integrations.destination.jdbc.SqlOperations; | ||
import io.airbyte.integrations.destination.jdbc.copy.StreamCopier; | ||
import io.airbyte.integrations.destination.jdbc.copy.s3.S3Config; | ||
import io.airbyte.integrations.destination.s3.S3DestinationConfig; | ||
import io.airbyte.integrations.destination.s3.parquet.S3ParquetFormatConfig; | ||
import io.airbyte.integrations.destination.s3.parquet.S3ParquetWriter; | ||
import io.airbyte.integrations.destination.s3.writer.S3WriterFactory; | ||
import io.airbyte.protocol.models.AirbyteRecordMessage; | ||
import io.airbyte.protocol.models.AirbyteStream; | ||
import io.airbyte.protocol.models.ConfiguredAirbyteStream; | ||
import java.sql.Timestamp; | ||
import java.util.UUID; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
/** | ||
* This implementation is similar to {@link io.airbyte.integrations.destination.jdbc.copy.s3.S3StreamCopier}. | ||
* The difference is that this implementation creates Parquet staging files, instead of CSV ones. | ||
*/ | ||
public class DatabricksStreamCopier implements StreamCopier { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We can focus on implementing the methods in this class to transfer the data from S3 to Databricks. |
||
|
||
private static final Logger LOGGER = LoggerFactory.getLogger(DatabricksStreamCopier.class); | ||
private static final ObjectMapper MAPPER = new ObjectMapper(); | ||
|
||
private final AmazonS3 s3Client; | ||
private final S3Config s3Config; | ||
private final String tmpTableName; | ||
private final AirbyteStream stream; | ||
private final JdbcDatabase db; | ||
private final ExtendedNameTransformer nameTransformer; | ||
private final SqlOperations sqlOperations; | ||
private final S3ParquetWriter parquetWriter; | ||
|
||
public DatabricksStreamCopier(String stagingFolder, | ||
String schema, | ||
ConfiguredAirbyteStream configuredStream, | ||
AmazonS3 s3Client, | ||
JdbcDatabase db, | ||
S3Config s3Config, | ||
ExtendedNameTransformer nameTransformer, | ||
SqlOperations sqlOperations, | ||
S3WriterFactory writerFactory, | ||
Timestamp uploadTime) throws Exception { | ||
this.stream = configuredStream.getStream(); | ||
this.db = db; | ||
this.nameTransformer = nameTransformer; | ||
this.sqlOperations = sqlOperations; | ||
this.tmpTableName = nameTransformer.getTmpTableName(stream.getName()); | ||
this.s3Client = s3Client; | ||
this.s3Config = s3Config; | ||
this.parquetWriter = (S3ParquetWriter) writerFactory | ||
.create(getS3DestinationConfig(s3Config, stagingFolder), s3Client, configuredStream, uploadTime); | ||
} | ||
|
||
@Override | ||
public void write(UUID id, AirbyteRecordMessage recordMessage) throws Exception { | ||
parquetWriter.write(id, recordMessage); | ||
} | ||
|
||
@Override | ||
public void closeStagingUploader(boolean hasFailed) throws Exception { | ||
parquetWriter.close(hasFailed); | ||
} | ||
|
||
@Override | ||
public void createTemporaryTable() throws Exception { | ||
|
||
} | ||
|
||
@Override | ||
public void copyStagingFileToTemporaryTable() throws Exception { | ||
|
||
} | ||
|
||
@Override | ||
public void createDestinationSchema() throws Exception { | ||
|
||
} | ||
|
||
@Override | ||
public String createDestinationTable() throws Exception { | ||
return null; | ||
} | ||
|
||
@Override | ||
public String generateMergeStatement(String destTableName) throws Exception { | ||
return null; | ||
} | ||
|
||
@Override | ||
public void removeFileAndDropTmpTable() throws Exception { | ||
|
||
} | ||
|
||
private S3DestinationConfig getS3DestinationConfig(S3Config s3Config, String stagingFolder) { | ||
return new S3DestinationConfig( | ||
s3Config.getEndpoint(), | ||
s3Config.getBucketName(), | ||
stagingFolder, | ||
s3Config.getRegion(), | ||
s3Config.getAccessKeyId(), | ||
s3Config.getSecretAccessKey(), | ||
// use default parquet format config | ||
new S3ParquetFormatConfig(MAPPER.createObjectNode()) | ||
); | ||
} | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,43 @@ | ||
package io.airbyte.integrations.destination.databricks; | ||
|
||
import com.amazonaws.services.s3.AmazonS3; | ||
import io.airbyte.db.jdbc.JdbcDatabase; | ||
import io.airbyte.integrations.destination.ExtendedNameTransformer; | ||
import io.airbyte.integrations.destination.jdbc.SqlOperations; | ||
import io.airbyte.integrations.destination.jdbc.copy.StreamCopier; | ||
import io.airbyte.integrations.destination.jdbc.copy.StreamCopierFactory; | ||
import io.airbyte.integrations.destination.jdbc.copy.s3.S3Config; | ||
import io.airbyte.integrations.destination.jdbc.copy.s3.S3StreamCopier; | ||
import io.airbyte.integrations.destination.s3.parquet.S3ParquetWriter; | ||
import io.airbyte.integrations.destination.s3.writer.ProductionWriterFactory; | ||
import io.airbyte.integrations.destination.s3.writer.S3WriterFactory; | ||
import io.airbyte.protocol.models.AirbyteStream; | ||
import io.airbyte.protocol.models.ConfiguredAirbyteStream; | ||
import java.sql.Timestamp; | ||
|
||
public class DatabricksStreamCopierFactory implements StreamCopierFactory<S3Config> { | ||
|
||
@Override | ||
public StreamCopier create(String configuredSchema, | ||
S3Config s3Config, | ||
String stagingFolder, | ||
ConfiguredAirbyteStream configuredStream, | ||
ExtendedNameTransformer nameTransformer, | ||
JdbcDatabase db, | ||
SqlOperations sqlOperations) { | ||
try { | ||
AirbyteStream stream = configuredStream.getStream(); | ||
String schema = StreamCopierFactory.getSchema(stream, configuredSchema, nameTransformer); | ||
AmazonS3 s3Client = S3StreamCopier.getAmazonS3(s3Config); | ||
S3WriterFactory writerFactory = new ProductionWriterFactory(); | ||
Timestamp uploadTimestamp = new Timestamp(System.currentTimeMillis()); | ||
|
||
return new DatabricksStreamCopier( | ||
stagingFolder, schema, configuredStream, s3Client, db, s3Config, nameTransformer, sqlOperations, writerFactory, uploadTimestamp); | ||
} catch (Exception e) { | ||
throw new RuntimeException(e); | ||
} | ||
|
||
} | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -24,6 +24,7 @@ | |
|
||
package io.airbyte.integrations.destination.jdbc.copy; | ||
|
||
import io.airbyte.protocol.models.AirbyteRecordMessage; | ||
import java.sql.Timestamp; | ||
import java.util.UUID; | ||
|
||
|
@@ -36,7 +37,7 @@ public interface StreamCopier { | |
/** | ||
* Writes a value to a staging file for the stream. | ||
*/ | ||
void write(UUID id, String jsonDataString, Timestamp emittedAt) throws Exception; | ||
void write(UUID id, AirbyteRecordMessage recordMessage) throws Exception; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This interface change is necessary to drop in the |
||
|
||
/** | ||
* Closes the writer for the stream to the staging persistence. This method should block until all | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -28,17 +28,25 @@ | |
import io.airbyte.integrations.destination.ExtendedNameTransformer; | ||
import io.airbyte.integrations.destination.jdbc.SqlOperations; | ||
import io.airbyte.protocol.models.AirbyteStream; | ||
import io.airbyte.protocol.models.ConfiguredAirbyteStream; | ||
import io.airbyte.protocol.models.DestinationSyncMode; | ||
|
||
public interface StreamCopierFactory<T> { | ||
|
||
StreamCopier create(String configuredSchema, | ||
T config, | ||
String stagingFolder, | ||
DestinationSyncMode syncMode, | ||
AirbyteStream stream, | ||
ConfiguredAirbyteStream configuredStream, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This interface change is necessary to reuse the |
||
ExtendedNameTransformer nameTransformer, | ||
JdbcDatabase db, | ||
SqlOperations sqlOperations); | ||
|
||
static String getSchema(AirbyteStream stream, String configuredSchema, ExtendedNameTransformer nameTransformer) { | ||
if (stream.getNamespace() != null) { | ||
return nameTransformer.convertStreamName(stream.getNamespace()); | ||
} else { | ||
return nameTransformer.convertStreamName(configuredSchema); | ||
} | ||
} | ||
|
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's depend on the s3 destination for now and DRY it later.