Skip to content

MSSQL remove normalization #36050

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@ plugins {
}

airbyteJavaConnector {
cdkVersionRequired = '0.2.0'
cdkVersionRequired = '0.30.2'
features = [
'db-sources', // required for tests
'db-destinations',
's3-destinations',
'typing-deduping'
]
useLocalCdk = false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,23 @@ data:
connectorSubtype: database
connectorType: destination
definitionId: d4353156-9217-4cad-8dd7-c108fd4f74cf
dockerImageTag: 0.2.0
dockerImageTag: 1.0.0
dockerRepository: airbyte/destination-mssql-strict-encrypt
githubIssueLabel: destination-mssql
icon: mssql.svg
license: ELv2
name: MS SQL Server
normalizationConfig:
normalizationIntegrationType: mssql
normalizationRepository: airbyte/normalization-mssql
normalizationTag: 0.4.1
releaseStage: alpha
releases:
breakingChanges:
1.0.0:
upgradeDeadline: "2024-05-25"
message: >
This version removes the option to use "normalization" with MSSQL. It also changes
the schema and database of Airbyte's "raw" tables to be compatible with the new
[Destinations V2](https://docs.airbyte.com/release_notes/upgrading_to_destinations_v2/#what-is-destinations-v2)
format. These changes will likely require updates to downstream dbt / SQL models.
Selecting `Upgrade` will upgrade **all** connections using this destination at their next sync.
documentationUrl: https://docs.airbyte.com/integrations/destinations/mssql
supportsDbt: true
tags:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,11 @@
import org.jooq.DSLContext;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.MSSQLServerContainer;

@Disabled("Disabled after DV2 migration. Re-enable with fixtures updated to DV2.")
public class MssqlStrictEncryptDestinationAcceptanceTest extends DestinationAcceptanceTest {

private static MSSQLServerContainer<?> db;
Expand Down Expand Up @@ -167,7 +169,7 @@ protected void setup(final TestDestinationEnv testEnv, final HashSet<String> TES

@Override
protected void tearDown(final TestDestinationEnv testEnv) {
dslContext.close();
// do nothing
}

@AfterAll
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@ plugins {
}

airbyteJavaConnector {
cdkVersionRequired = '0.2.0'
cdkVersionRequired = '0.30.2'
features = [
'db-sources', // required for tests
'db-destinations',
's3-destinations',
'typing-deduping'
]
useLocalCdk = false
}
Expand Down
16 changes: 11 additions & 5 deletions airbyte-integrations/connectors/destination-mssql/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,29 @@ data:
connectorSubtype: database
connectorType: destination
definitionId: d4353156-9217-4cad-8dd7-c108fd4f74cf
dockerImageTag: 0.2.0
dockerImageTag: 1.0.0
dockerRepository: airbyte/destination-mssql
githubIssueLabel: destination-mssql
icon: mssql.svg
license: ELv2
name: MS SQL Server
normalizationConfig:
normalizationIntegrationType: mssql
normalizationRepository: airbyte/normalization-mssql
normalizationTag: 0.4.3
registries:
cloud:
dockerRepository: airbyte/destination-mssql-strict-encrypt
enabled: true
oss:
enabled: true
releaseStage: alpha
releases:
breakingChanges:
1.0.0:
upgradeDeadline: "2024-05-25"
message: >
This version removes the option to use "normalization" with MSSQL. It also changes
the schema and database of Airbyte's "raw" tables to be compatible with the new
[Destinations V2](https://docs.airbyte.com/release_notes/upgrading_to_destinations_v2/#what-is-destinations-v2)
format. These changes will likely require updates to downstream dbt / SQL models.
Selecting `Upgrade` will upgrade **all** connections using this destination at their next sync.
documentationUrl: https://docs.airbyte.com/integrations/destinations/mssql
supportsDbt: true
tags:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,28 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import io.airbyte.cdk.db.factory.DatabaseDriver;
import io.airbyte.cdk.db.jdbc.JdbcDatabase;
import io.airbyte.cdk.db.jdbc.JdbcUtils;
import io.airbyte.cdk.integrations.base.Destination;
import io.airbyte.cdk.integrations.base.IntegrationRunner;
import io.airbyte.cdk.integrations.base.ssh.SshWrappedDestination;
import io.airbyte.cdk.integrations.destination.jdbc.AbstractJdbcDestination;
import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcDestinationHandler;
import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcSqlGenerator;
import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.NoOpJdbcDestinationHandler;
import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.RawOnlySqlGenerator;
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.base.destination.typing_deduping.DestinationHandler;
import io.airbyte.integrations.base.destination.typing_deduping.SqlGenerator;
import io.airbyte.integrations.base.destination.typing_deduping.migrators.Migration;
import io.airbyte.integrations.base.destination.typing_deduping.migrators.MinimumDestinationState;
import java.io.File;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.jetbrains.annotations.NotNull;
import org.jooq.SQLDialect;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -30,6 +42,7 @@ public MSSQLDestination() {
super(DRIVER_CLASS, new MSSQLNameTransformer(), new SqlServerOperations());
}

@NotNull
@Override
protected Map<String, String> getDefaultConnectionProperties(final JsonNode config) {
final HashMap<String, String> properties = new HashMap<>();
Expand Down Expand Up @@ -57,6 +70,7 @@ protected Map<String, String> getDefaultConnectionProperties(final JsonNode conf
return properties;
}

@NotNull
@Override
public JsonNode toJdbcConfig(final JsonNode config) {
final String schema = Optional.ofNullable(config.get("schema")).map(JsonNode::asText).orElse("public");
Expand All @@ -81,6 +95,22 @@ public JsonNode toJdbcConfig(final JsonNode config) {
return Jsons.jsonNode(configBuilder.build());
}

@Override
protected JdbcDestinationHandler<? extends MinimumDestinationState> getDestinationHandler(final String databaseName,
final JdbcDatabase database,
final String rawTableSchema) {
return new NoOpJdbcDestinationHandler<>(databaseName, database, rawTableSchema, SQLDialect.DEFAULT);
}

@NotNull
@Override
protected List<Migration> getMigrations(final JdbcDatabase database,
final String databaseName,
final SqlGenerator sqlGenerator,
final DestinationHandler destinationHandler) {
return List.of();
}

private String getTrustStoreLocation() {
// trust store location code found at https://stackoverflow.com/a/56570588
final String trustStoreLocation = Optional.ofNullable(System.getProperty("javax.net.ssl.trustStore"))
Expand All @@ -104,4 +134,20 @@ public static void main(final String[] args) throws Exception {
LOGGER.info("completed destination: {}", MSSQLDestination.class);
}

@Override
public boolean isV2Destination() {
return true;
}

@Override
protected boolean shouldAlwaysDisableTypeDedupe() {
return true;
}

@NotNull
@Override
protected JdbcSqlGenerator getSqlGenerator(@NotNull final JsonNode config) {
return new RawOnlySqlGenerator(new MSSQLNameTransformer());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,26 @@
package io.airbyte.integrations.destination.mssql;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
import io.airbyte.cdk.db.jdbc.JdbcDatabase;
import io.airbyte.cdk.integrations.base.JavaBaseConstants;
import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteMessage;
import io.airbyte.cdk.integrations.destination.jdbc.SqlOperations;
import io.airbyte.cdk.integrations.destination.jdbc.SqlOperationsUtils;
import io.airbyte.protocol.models.v0.AirbyteRecordMessage;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.time.Instant;
import java.util.List;
import java.util.Objects;
import java.util.UUID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SqlServerOperations implements SqlOperations {

private static final Logger LOGGER = LoggerFactory.getLogger(SqlServerOperations.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();

@Override
public void createSchemaIfNotExists(final JdbcDatabase database, final String schemaName) throws Exception {
final String query = String.format("IF NOT EXISTS ( SELECT * FROM sys.schemas WHERE name = '%s') EXEC('CREATE SCHEMA [%s]')",
Expand All @@ -37,10 +46,12 @@ public String createTableQuery(final JdbcDatabase database, final String schemaN
+ "CREATE TABLE %s.%s ( \n"
+ "%s VARCHAR(64) PRIMARY KEY,\n"
+ "%s NVARCHAR(MAX),\n" // Microsoft SQL Server specific: NVARCHAR can store Unicode meanwhile VARCHAR - not
+ "%s DATETIMEOFFSET(7) DEFAULT SYSDATETIMEOFFSET()\n"
+ "%s DATETIMEOFFSET(7) DEFAULT SYSDATETIMEOFFSET(),\n"
+ "%s DATETIMEOFFSET(7),\n"
+ "%s NVARCHAR(MAX),\n"
+ ");\n",
schemaName, tableName, schemaName, tableName, JavaBaseConstants.COLUMN_NAME_AB_ID, JavaBaseConstants.COLUMN_NAME_DATA,
JavaBaseConstants.COLUMN_NAME_EMITTED_AT);
schemaName, tableName, schemaName, tableName, JavaBaseConstants.COLUMN_NAME_AB_RAW_ID, JavaBaseConstants.COLUMN_NAME_DATA,
JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT, JavaBaseConstants.COLUMN_NAME_AB_LOADED_AT, JavaBaseConstants.COLUMN_NAME_AB_META);
}

@Override
Expand All @@ -60,30 +71,60 @@ public String truncateTableQuery(final JdbcDatabase database, final String schem

@Override
public void insertRecords(final JdbcDatabase database,
final List<AirbyteRecordMessage> records,
final List<PartialAirbyteMessage> records,
final String schemaName,
final String tempTableName)
throws SQLException {
// MSSQL has a limitation of 2100 parameters used in a query
// Airbyte inserts data with 3 columns (raw table) this limits to 700 records.
// Limited the variable to 500 records to
final int MAX_BATCH_SIZE = 500;
final int MAX_BATCH_SIZE = 400;
final String insertQueryComponent = String.format(
"INSERT INTO %s.%s (%s, %s, %s) VALUES\n",
"INSERT INTO %s.%s (%s, %s, %s, %s, %s) VALUES\n",
schemaName,
tempTableName,
JavaBaseConstants.COLUMN_NAME_AB_ID,
JavaBaseConstants.COLUMN_NAME_AB_RAW_ID,
JavaBaseConstants.COLUMN_NAME_DATA,
JavaBaseConstants.COLUMN_NAME_EMITTED_AT);
final String recordQueryComponent = "(?, ?, ?),\n";
final List<List<AirbyteRecordMessage>> batches = Lists.partition(records, MAX_BATCH_SIZE);
batches.forEach(record -> {
try {
SqlOperationsUtils.insertRawRecordsInSingleQuery(insertQueryComponent, recordQueryComponent, database, record);
} catch (final SQLException e) {
e.printStackTrace();
JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT,
JavaBaseConstants.COLUMN_NAME_AB_LOADED_AT,
JavaBaseConstants.COLUMN_NAME_AB_META);
final String recordQueryComponent = "(?, ?, ?, ?, ?),\n";
final List<List<PartialAirbyteMessage>> batches = Lists.partition(records, MAX_BATCH_SIZE);
for (List<PartialAirbyteMessage> batch : batches) {
if (batch.isEmpty()) {
continue;
}
});
database.execute(connection -> {
final StringBuilder sqlStatement = new StringBuilder(insertQueryComponent);
for (PartialAirbyteMessage ignored : batch) {
sqlStatement.append(recordQueryComponent);
}
final var sql = sqlStatement.substring(0, sqlStatement.length() - 2) + ";";
try (final var statement = connection.prepareStatement(sql)) {
int i = 1;
for (PartialAirbyteMessage record : batch) {
final var id = UUID.randomUUID().toString();
statement.setString(i++, id);
statement.setString(i++, record.getSerialized());
statement.setTimestamp(i++, Timestamp.from(Instant.ofEpochMilli(Objects.requireNonNull(record.getRecord()).getEmittedAt())));
statement.setTimestamp(i++, null);
String metadata;
if (record.getRecord().getMeta() != null) {
try {
metadata = OBJECT_MAPPER.writeValueAsString(record.getRecord().getMeta());
} catch (Exception e) {
LOGGER.error("Failed to serialize record metadata for record {}", id, e);
metadata = null;
}
} else {
metadata = null;
}
statement.setString(i++, metadata);
}
statement.execute();
}
});
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,12 @@
}
}
]
},
"raw_data_schema": {
"type": "string",
"description": "The schema to write raw tables into (default: airbyte_internal)",
"title": "Raw Table Schema Name",
"order": 7
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,15 @@
import org.jooq.DSLContext;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Disabled;
import org.testcontainers.containers.MSSQLServerContainer;

@Disabled("Disabled after DV2 migration. Re-enable with fixtures updated to DV2.")
public class MSSQLDestinationAcceptanceTest extends JdbcDestinationAcceptanceTest {

private static MSSQLServerContainer<?> db;
private final StandardNameTransformer namingResolver = new StandardNameTransformer();
private JsonNode config;
private DSLContext dslContext;

@Override
protected String getImageName() {
Expand Down Expand Up @@ -93,17 +94,16 @@ protected List<JsonNode> retrieveNormalizedRecords(final TestDestinationEnv env,
}

private List<JsonNode> retrieveRecordsFromTable(final String tableName, final String schemaName) throws SQLException {
try (final DSLContext dslContext = DatabaseConnectionHelper.createDslContext(db, null)) {
return getDatabase(dslContext).query(
ctx -> {
ctx.fetch(String.format("USE %s;", config.get(JdbcUtils.DATABASE_KEY)));
return ctx
.fetch(String.format("SELECT * FROM %s.%s ORDER BY %s ASC;", schemaName, tableName, JavaBaseConstants.COLUMN_NAME_EMITTED_AT))
.stream()
.map(this::getJsonFromRecord)
.collect(Collectors.toList());
});
}
final DSLContext dslContext = DatabaseConnectionHelper.createDslContext(db, null);
return getDatabase(dslContext).query(
ctx -> {
ctx.fetch(String.format("USE %s;", config.get(JdbcUtils.DATABASE_KEY)));
return ctx
.fetch(String.format("SELECT * FROM %s.%s ORDER BY %s ASC;", schemaName, tableName, JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT))
.stream()
.map(this::getJsonFromRecord)
.collect(Collectors.toList());
});
}

@BeforeAll
Expand Down Expand Up @@ -134,7 +134,7 @@ private static Database getDatabase(final DSLContext dslContext) {
protected void setup(final TestDestinationEnv testEnv, final HashSet<String> TEST_SCHEMAS) throws SQLException {
final JsonNode configWithoutDbName = getConfig(db);
final String dbName = Strings.addRandomSuffix("db", "_", 10);
dslContext = getDslContext(configWithoutDbName);
DSLContext dslContext = getDslContext(configWithoutDbName);
final Database database = getDatabase(dslContext);
database.query(ctx -> {
ctx.fetch(String.format("CREATE DATABASE %s;", dbName));
Expand All @@ -150,8 +150,9 @@ protected void setup(final TestDestinationEnv testEnv, final HashSet<String> TES
}

@Override
protected void tearDown(final TestDestinationEnv testEnv) {
dslContext.close();
protected void tearDown(final TestDestinationEnv testEnv) throws Exception {
db.stop();
db.close();
}

@Override
Expand Down
Loading
Loading