Skip to content

Commit 830fac6

Browse files
authored
Turn on MYSQL normalization flag. (#4651)
* Turn on normalization flag. Bump versions
1 parent 21c961e commit 830fac6

File tree

12 files changed

+110
-26
lines changed

12 files changed

+110
-26
lines changed

airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/ca81ee7c-3163-4246-af40-094cc31e5e42.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,6 @@
22
"destinationDefinitionId": "ca81ee7c-3163-4246-af40-094cc31e5e42",
33
"name": "MySQL",
44
"dockerRepository": "airbyte/destination-mysql",
5-
"dockerImageTag": "0.1.6",
5+
"dockerImageTag": "0.1.7",
66
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/mysql"
77
}

airbyte-config/init/src/main/resources/seed/destination_definitions.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@
5353
- destinationDefinitionId: ca81ee7c-3163-4246-af40-094cc31e5e42
5454
name: MySQL
5555
dockerRepository: airbyte/destination-mysql
56-
dockerImageTag: 0.1.6
56+
dockerImageTag: 0.1.7
5757
documentationUrl: https://docs.airbyte.io/integrations/destinations/mysql
5858
- destinationDefinitionId: d4353156-9217-4cad-8dd7-c108fd4f74cf
5959
name: MS SQL Server

airbyte-integrations/bases/base-normalization/Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,5 +24,5 @@ WORKDIR /airbyte
2424
ENV AIRBYTE_ENTRYPOINT "/airbyte/entrypoint.sh"
2525
ENTRYPOINT ["/airbyte/entrypoint.sh"]
2626

27-
LABEL io.airbyte.version=0.1.35
27+
LABEL io.airbyte.version=0.1.36
2828
LABEL io.airbyte.name=airbyte/normalization

airbyte-integrations/bases/base-normalization/build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,10 @@ task("customIntegrationTestPython", type: PythonTask, dependsOn: installTestReqs
2222

2323
dependsOn ':airbyte-integrations:bases:base-normalization:airbyteDocker'
2424
dependsOn ':airbyte-integrations:connectors:destination-bigquery:airbyteDocker'
25+
dependsOn ':airbyte-integrations:connectors:destination-mysql:airbyteDocker'
2526
dependsOn ':airbyte-integrations:connectors:destination-postgres:airbyteDocker'
2627
dependsOn ':airbyte-integrations:connectors:destination-redshift:airbyteDocker'
2728
dependsOn ':airbyte-integrations:connectors:destination-snowflake:airbyteDocker'
28-
dependsOn ':airbyte-integrations:connectors:destination-mysql:airbyteDocker'
2929
}
3030

3131
integrationTest.dependsOn("customIntegrationTestPython")

airbyte-integrations/bases/base-normalization/dbt-project-template/macros/cross_db_utils/type_conversions.sql

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,11 @@
3434
cast({{ field }} as boolean)
3535
{%- endmacro %}
3636

37+
{# -- MySQL does not support cast function converting string directly to boolean (an alias of tinyint(1), https://dev.mysql.com/doc/refman/8.0/en/cast-functions.html#function_cast #}
38+
{% macro mysql__cast_to_boolean(field) -%}
39+
IF(lower({{ field }}) = 'true', true, false)
40+
{%- endmacro %}
41+
3742
{# -- Redshift does not support converting string directly to boolean, it must go through int first #}
3843
{% macro redshift__cast_to_boolean(field) -%}
3944
cast(decode({{ field }}, 'true', '1', 'false', '0')::integer as boolean)

airbyte-integrations/bases/standard-destination-test/src/main/java/io/airbyte/integrations/standardtest/destination/DestinationAcceptanceTest.java

Lines changed: 25 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -419,7 +419,16 @@ public void testLineBreakCharacters() throws Exception {
419419

420420
@Test
421421
public void specNormalizationValueShouldBeCorrect() throws Exception {
422-
assertEquals(normalizationFromSpec(), supportsNormalization());
422+
final boolean normalizationFromSpec = normalizationFromSpec();
423+
assertEquals(normalizationFromSpec, supportsNormalization());
424+
boolean normalizationRunnerFactorySupportsDestinationImage;
425+
try {
426+
NormalizationRunnerFactory.create(getImageName(), processFactory);
427+
normalizationRunnerFactorySupportsDestinationImage = true;
428+
} catch (IllegalStateException e) {
429+
normalizationRunnerFactorySupportsDestinationImage = false;
430+
}
431+
assertEquals(normalizationFromSpec, normalizationRunnerFactorySupportsDestinationImage);
423432
}
424433

425434
@Test
@@ -666,11 +675,11 @@ protected int getMaxRecordValueLimit() {
666675
}
667676

668677
@Test
669-
void testCustomDbtTransformations() throws Exception {
678+
public void testCustomDbtTransformations() throws Exception {
670679
if (!normalizationFromSpec() || !dbtFromSpec()) {
671-
// TODO : Fix this, this test should not be restricted to destinations that support normalization
672-
// to do so, we need to inject extra packages for dbt to run with dbt community adapters depending
673-
// on the destination
680+
// we require normalization implementation for this destination, because we make sure to install
681+
// required dbt dependency in the normalization docker image in order to run this test successfully
682+
// (we don't actually rely on normalization running anything here though)
674683
return;
675684
}
676685

@@ -684,7 +693,7 @@ void testCustomDbtTransformations() throws Exception {
684693
final OperatorDbt dbtConfig = new OperatorDbt()
685694
.withGitRepoUrl("https://github.com/fishtown-analytics/jaffle_shop.git")
686695
.withGitRepoBranch("main")
687-
.withDockerImage("fishtownanalytics/dbt:0.19.1");
696+
.withDockerImage("airbyte/normalization:dev");
688697
//
689698
// jaffle_shop is a fictional ecommerce store maintained by fishtownanalytics/dbt.
690699
//
@@ -733,13 +742,10 @@ void testCustomDbtTransformations() throws Exception {
733742

734743
@Test
735744
void testCustomDbtTransformationsFailure() throws Exception {
736-
if (!normalizationFromSpec()) {
737-
// TODO : Fix this, this test should not be restricted to destinations that support normalization
738-
// to do so, we need to inject extra packages for dbt to run with dbt community adapters depending
739-
// on the destination
740-
return;
741-
}
742-
if (!dbtFromSpec()) {
745+
if (!normalizationFromSpec() || !dbtFromSpec()) {
746+
// we require normalization implementation for this destination, because we make sure to install
747+
// required dbt dependency in the normalization docker image in order to run this test successfully
748+
// (we don't actually rely on normalization running anything here though)
743749
return;
744750
}
745751

@@ -1002,11 +1008,16 @@ private void assertSameData(List<JsonNode> expected, List<JsonNode> actual) {
10021008
}
10031009
LOGGER.info("For {} Expected {} vs Actual {}", key, expectedValue, actualValue);
10041010
assertTrue(actualData.has(key));
1005-
assertEquals(expectedValue, actualValue);
1011+
assertSameValue(expectedValue, actualValue);
10061012
}
10071013
}
10081014
}
10091015

1016+
// Allows subclasses to implement custom comparison asserts
1017+
protected void assertSameValue(JsonNode expectedValue, JsonNode actualValue) {
1018+
assertEquals(expectedValue, actualValue);
1019+
}
1020+
10101021
protected List<AirbyteRecordMessage> retrieveNormalizedRecords(AirbyteCatalog catalog, String defaultSchema) throws Exception {
10111022
final List<AirbyteRecordMessage> actualMessages = new ArrayList<>();
10121023

airbyte-integrations/connectors/destination-mysql/Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar
88

99
RUN tar xf ${APPLICATION}.tar --strip-components=1
1010

11-
LABEL io.airbyte.version=0.1.6
11+
LABEL io.airbyte.version=0.1.7
1212
LABEL io.airbyte.name=airbyte/destination-mysql

airbyte-integrations/connectors/destination-mysql/src/main/java/io/airbyte/integrations/destination/mysql/MySQLNameTransformer.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,16 @@
2626

2727
import io.airbyte.integrations.destination.ExtendedNameTransformer;
2828

29+
/**
30+
* Note that MySQL documentation discusses about identifiers case sensitivity using the
31+
* lower_case_table_names system variable. As one of their recommendation is: "It is best to adopt a
32+
* consistent convention, such as always creating and referring to databases and tables using
33+
* lowercase names. This convention is recommended for maximum portability and ease of use.
34+
*
35+
* Source: https://dev.mysql.com/doc/refman/8.0/en/identifier-case-sensitivity.html"
36+
*
37+
* As a result, we are here forcing all identifier (table, schema and columns) names to lowercase.
38+
*/
2939
public class MySQLNameTransformer extends ExtendedNameTransformer {
3040

3141
// These constants must match those in destination_name_transformer.py
@@ -39,19 +49,19 @@ public class MySQLNameTransformer extends ExtendedNameTransformer {
3949

4050
@Override
4151
public String getIdentifier(String name) {
42-
String identifier = super.getIdentifier(name);
52+
String identifier = applyDefaultCase(super.getIdentifier(name));
4353
return truncateName(identifier, TRUNCATION_MAX_NAME_LENGTH);
4454
}
4555

4656
@Override
4757
public String getTmpTableName(String streamName) {
48-
String tmpTableName = super.getTmpTableName(streamName);
58+
String tmpTableName = applyDefaultCase(super.getTmpTableName(streamName));
4959
return truncateName(tmpTableName, TRUNCATION_MAX_NAME_LENGTH);
5060
}
5161

5262
@Override
5363
public String getRawTableName(String streamName) {
54-
String rawTableName = super.getRawTableName(streamName);
64+
String rawTableName = applyDefaultCase(super.getRawTableName(streamName));
5565
return truncateName(rawTableName, TRUNCATION_MAX_NAME_LENGTH);
5666
}
5767

airbyte-integrations/connectors/destination-mysql/src/main/resources/spec.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
{
22
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/mysql",
33
"supportsIncremental": true,
4-
"supportsNormalization": false,
4+
"supportsNormalization": true,
55
"supportsDBT": true,
66
"supported_destination_sync_modes": ["overwrite", "append"],
77
"connectionSpecification": {

airbyte-integrations/connectors/destination-mysql/src/test-integration/java/io/airbyte/integrations/destination/mysql/MySQLDestinationAcceptanceTest.java

Lines changed: 47 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424

2525
package io.airbyte.integrations.destination.mysql;
2626

27+
import static org.junit.jupiter.api.Assertions.assertEquals;
28+
2729
import com.fasterxml.jackson.databind.JsonNode;
2830
import com.google.common.collect.ImmutableMap;
2931
import io.airbyte.commons.json.Jsons;
@@ -32,6 +34,7 @@
3234
import io.airbyte.integrations.destination.ExtendedNameTransformer;
3335
import io.airbyte.integrations.standardtest.destination.DestinationAcceptanceTest;
3436
import java.sql.SQLException;
37+
import java.util.ArrayList;
3538
import java.util.List;
3639
import java.util.stream.Collectors;
3740
import org.jooq.JSONFormat;
@@ -45,7 +48,7 @@ public class MySQLDestinationAcceptanceTest extends DestinationAcceptanceTest {
4548
private static final JSONFormat JSON_FORMAT = new JSONFormat().recordFormat(RecordFormat.OBJECT);
4649

4750
private MySQLContainer<?> db;
48-
private ExtendedNameTransformer namingResolver = new MySQLNameTransformer();
51+
private final ExtendedNameTransformer namingResolver = new MySQLNameTransformer();
4952

5053
@Override
5154
protected String getImageName() {
@@ -62,6 +65,11 @@ protected boolean implementsNamespaces() {
6265
return true;
6366
}
6467

68+
@Override
69+
protected boolean supportsNormalization() {
70+
return true;
71+
}
72+
6573
@Override
6674
protected JsonNode getConfig() {
6775
return Jsons.jsonNode(ImmutableMap.builder()
@@ -123,6 +131,25 @@ private List<JsonNode> retrieveRecordsFromTable(String tableName, String schemaN
123131
.collect(Collectors.toList()));
124132
}
125133

134+
@Override
135+
protected List<JsonNode> retrieveNormalizedRecords(TestDestinationEnv testEnv, String streamName, String namespace) throws Exception {
136+
String tableName = namingResolver.getIdentifier(streamName);
137+
String schema = namingResolver.getIdentifier(namespace);
138+
return retrieveRecordsFromTable(tableName, schema);
139+
}
140+
141+
@Override
142+
protected List<String> resolveIdentifier(String identifier) {
143+
final List<String> result = new ArrayList<>();
144+
final String resolved = namingResolver.getIdentifier(identifier);
145+
result.add(identifier);
146+
result.add(resolved);
147+
if (!resolved.startsWith("\"")) {
148+
result.add(resolved.toLowerCase());
149+
}
150+
return result;
151+
}
152+
126153
@Override
127154
protected void setup(TestDestinationEnv testEnv) {
128155
db = new MySQLContainer<>("mysql:8.0");
@@ -141,7 +168,7 @@ private void revokeAllPermissions() {
141168
}
142169

143170
private void grantCorrectPermissions() {
144-
executeQuery("GRANT CREATE, INSERT, SELECT, DROP ON *.* TO " + db.getUsername() + "@'%';");
171+
executeQuery("GRANT ALTER, CREATE, INSERT, SELECT, DROP ON *.* TO " + db.getUsername() + "@'%';");
145172
}
146173

147174
private void executeQuery(String query) {
@@ -168,10 +195,28 @@ protected void tearDown(TestDestinationEnv testEnv) {
168195
db.close();
169196
}
170197

198+
@Override
199+
@Test
200+
public void testCustomDbtTransformations() throws Exception {
201+
// We need to create view for testing custom dbt transformations
202+
executeQuery("GRANT CREATE VIEW ON *.* TO " + db.getUsername() + "@'%';");
203+
// overrides test with a no-op until https://github.com/dbt-labs/jaffle_shop/pull/8 is merged
204+
// super.testCustomDbtTransformations();
205+
}
206+
171207
@Override
172208
@Test
173209
public void testLineBreakCharacters() {
174210
// overrides test with a no-op until we handle full UTF-8 in the destination
175211
}
176212

213+
protected void assertSameValue(JsonNode expectedValue, JsonNode actualValue) {
214+
if (expectedValue.isBoolean()) {
215+
// Boolean in MySQL are stored as TINYINT (0 or 1) so we force them to boolean values here
216+
assertEquals(expectedValue.asBoolean(), actualValue.asBoolean());
217+
} else {
218+
assertEquals(expectedValue, actualValue);
219+
}
220+
}
221+
177222
}

airbyte-workers/src/main/java/io/airbyte/workers/normalization/DefaultNormalizationRunner.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ public class DefaultNormalizationRunner implements NormalizationRunner {
4747

4848
private static final Logger LOGGER = LoggerFactory.getLogger(DefaultNormalizationRunner.class);
4949

50-
public static final String NORMALIZATION_IMAGE_NAME = "airbyte/normalization:0.1.35";
50+
public static final String NORMALIZATION_IMAGE_NAME = "airbyte/normalization:0.1.36";
5151

5252
private final DestinationType destinationType;
5353
private final ProcessFactory processFactory;

docs/integrations/destinations/mysql.md

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,10 +56,23 @@ You should now have all the requirements needed to configure MySQL as a destinat
5656
* **Password**
5757
* **Database**
5858

59+
## Known limitations
60+
61+
Note that MySQL documentation discusses identifiers case sensitivity using the `lower_case_table_names` system variable.
62+
One of their recommendations is:
63+
64+
"It is best to adopt a consistent convention, such as always creating and referring to databases and tables using lowercase names.
65+
This convention is recommended for maximum portability and ease of use."
66+
67+
[Source: MySQL docs](https://dev.mysql.com/doc/refman/8.0/en/identifier-case-sensitivity.html)
68+
69+
As a result, Airbyte MySQL destination forces all identifier (table, schema and columns) names to be lowercase.
70+
5971
## CHANGELOG
6072

6173
| Version | Date | Pull Request | Subject |
6274
| :--- | :--- | :--- | :--- |
75+
| 0.1.7 | 2021-07-09 | [#4651](https://github.com/airbytehq/airbyte/pull/4651) | Switch normalization flag on so users can use normalization. |
6376
| 0.1.6 | 2021-07-03 | [#4531](https://github.com/airbytehq/airbyte/pull/4531) | Added normalization for MySQL. |
6477
| 0.1.5 | 2021-07-03 | [#3973](https://github.com/airbytehq/airbyte/pull/3973) | Added `AIRBYTE_ENTRYPOINT` for kubernetes support. |
6578
| 0.1.4 | 2021-07-03 | [#3290](https://github.com/airbytehq/airbyte/pull/3290) | Switched to get states from destination instead of source. |

0 commit comments

Comments
 (0)