-
Notifications
You must be signed in to change notification settings - Fork 4.5k
Implement databricks connector as a copy destination #5748
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
@@ -15,6 +15,14 @@ dependencies { | |||
implementation project(':airbyte-integrations:bases:base-java') | |||
implementation files(project(':airbyte-integrations:bases:base-java').airbyteDocker.outputs) | |||
implementation project(':airbyte-integrations:connectors:destination-jdbc') | |||
implementation project(':airbyte-integrations:connectors:destination-s3') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's depend on the s3 destination for now and DRY it later.
|
||
@Override | ||
public AirbyteConnectionStatus check(JsonNode config) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The CopyDestination
can check or create the SQL tables by calling the abstract method. So we only need to check the persistence in this implementation.
* This implementation is similar to {@link io.airbyte.integrations.destination.jdbc.copy.s3.S3StreamCopier}. | ||
* The difference is that this implementation creates Parquet staging files, instead of CSV ones. | ||
*/ | ||
public class DatabricksStreamCopier implements StreamCopier { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can focus on implementing the methods in this class to transfer the data from S3 to Databricks.
@@ -36,7 +37,7 @@ | |||
/** | |||
* Writes a value to a staging file for the stream. | |||
*/ | |||
void write(UUID id, String jsonDataString, Timestamp emittedAt) throws Exception; | |||
void write(UUID id, AirbyteRecordMessage recordMessage) throws Exception; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This interface change is necessary to drop in the S3ParquetWriter
directly for convenience.
import io.airbyte.protocol.models.DestinationSyncMode; | ||
|
||
public interface StreamCopierFactory<T> { | ||
|
||
StreamCopier create(String configuredSchema, | ||
T config, | ||
String stagingFolder, | ||
DestinationSyncMode syncMode, | ||
AirbyteStream stream, | ||
ConfiguredAirbyteStream configuredStream, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This interface change is necessary to reuse the S3Writer
.
@tuliren I'll merge this in and then work off it |
JdbcDestination
toCopyDestination
.