Skip to content

JDBC Sources: validate actual source schema #21844

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import static org.assertj.core.api.AssertionsForClassTypes.catchThrowable;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

import com.fasterxml.jackson.databind.JsonNode;
Expand Down Expand Up @@ -64,6 +65,7 @@ class PostgresSourceTest {
private static final String STREAM_NAME = "id_and_name";
private static final String STREAM_NAME_PRIVILEGES_TEST_CASE = "id_and_name_3";
private static final String STREAM_NAME_PRIVILEGES_TEST_CASE_VIEW = "id_and_name_3_view";
private static final String STREAM_NAME_CHANGED_SOURCE_SHEMA = "test_source_schema";
private static final AirbyteCatalog CATALOG = new AirbyteCatalog().withStreams(List.of(
CatalogHelpers.createAirbyteStream(
STREAM_NAME,
Expand Down Expand Up @@ -95,6 +97,13 @@ class PostgresSourceTest {
Field.of("name", JsonSchemaType.STRING))
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
.withSourceDefinedPrimaryKey(List.of(List.of("id"))),
CatalogHelpers.createAirbyteStream(
STREAM_NAME_CHANGED_SOURCE_SHEMA,
SCHEMA_NAME,
Field.of("id", JsonSchemaType.INTEGER),
Field.of("test_column", JsonSchemaType.INTEGER))
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
.withSourceDefinedPrimaryKey(List.of(List.of("id"))),
CatalogHelpers.createAirbyteStream(
STREAM_NAME_PRIVILEGES_TEST_CASE_VIEW,
SCHEMA_NAME,
Expand Down Expand Up @@ -253,6 +262,45 @@ public void testCanReadUtf8() throws Exception {
}
}

@Test
public void testValuesChangedOnChangedSourceSchema() throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I understand what's being tested in this test: IIUC this test is verifying that if the underlying schema is changed, the records read will be different

But won't this always be true? Across two different reads, even with without the changes in this PR the read will always return different records

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I understand what's being tested in this test: IIUC this test is verifying that if the underlying schema is changed, the records read will be different

But won't this always be true? Across two different reads, even with without the changes in this PR the read will always return different records

updated this test with 4 different reads with the same catalog:
1 read - initial sync (integers)
2 read - nothing changed ==> values are same as in 1 read (integers)
3 read - added 1 record into the source table ==> added 1 airbyte record (integers)
4 read - user changed column definition from int to float ==> values in airbyte records become double

This test fully emulate behaviour from this issue

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But you aren't actually checking for the actual change you're making here, correct? i.e. you aren't verifying the correct logs are being outputted.

In that case, I think you should remove this test altogether since that change is a logging one (and not a validation)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But you aren't actually checking for the actual change you're making here, correct? i.e. you aren't verifying the correct logs are being outputted.

In that case, I think you should remove this test altogether since that change is a logging one (and not a validation)

removed the test


try (final PostgreSQLContainer<?> db = new PostgreSQLContainer<>("postgres:13-alpine")) {
db.start();
final PostgresSource postgresSource = new PostgresSource();
final JsonNode config = getConfig(db);

try (final DSLContext dslContext = getDslContext(config)) {
final Database database = getDatabase(dslContext);
database.query(ctx -> {
ctx.fetch("CREATE TABLE test_source_schema(id INTEGER, test_column int);");
ctx.fetch("INSERT INTO test_source_schema(id, test_column) VALUES (1, 20.0), (2, 1.0);");
return null;
});
}

final Set<AirbyteMessage> actualMessages1read = MoreIterators.toSet(postgresSource.read(config, CONFIGURED_CATALOG, null));
// assert read is successful if source schema was not changed
assertEquals(2, actualMessages1read.size());

try (final DSLContext dslContext = getDslContext(config)) {
final Database database = getDatabase(dslContext);
database.query(ctx -> {
ctx.fetch("ALTER TABLE test_source_schema ALTER COLUMN test_column type float using test_column::float;;");
return null;
});
}

final Set<AirbyteMessage> actualMessages2read = MoreIterators.toSet(postgresSource.read(config, CONFIGURED_CATALOG, null));
final List<JsonNode> values1read = actualMessages1read.stream().map(x -> x.getRecord().getData().get("test_column")).toList();
final List<JsonNode> values2read = actualMessages2read.stream().map(x -> x.getRecord().getData().get("test_column")).toList();

// assert values are changed if the source schema was changed
assertNotEquals(values2read, values1read);
db.stop();
}
}

@Test
void testUserDoesntHasPrivilegesToSelectTable() throws Exception {
try (final PostgreSQLContainer<?> db = new PostgreSQLContainer<>("postgres:13-alpine")) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package io.airbyte.integrations.source.relationaldb;

import static io.airbyte.integrations.base.errors.messages.ErrorMessage.getErrorMessage;
import static io.airbyte.protocol.models.v0.CatalogHelpers.fieldsToJsonSchema;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.base.Preconditions;
Expand Down Expand Up @@ -161,6 +162,8 @@ public AutoCloseableIterator<AirbyteMessage> read(final JsonNode config,

validateCursorFieldForIncrementalTables(fullyQualifiedTableNameToInfo, catalog, database);

validateSourceSchema(fullyQualifiedTableNameToInfo, catalog, database);

final List<AutoCloseableIterator<AirbyteMessage>> incrementalIterators =
getIncrementalIterators(database, catalog, fullyQualifiedTableNameToInfo, stateManager,
emittedAt);
Expand All @@ -180,6 +183,35 @@ public AutoCloseableIterator<AirbyteMessage> read(final JsonNode config,
});
}

private void validateSourceSchema(Map<String, TableInfo<CommonField<DataType>>> fullyQualifiedTableNameToInfo,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should be changed to be named logSourceSchemaChange

validate implies that if it isn't the same you'd be throwing an error

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also add a quick comment on what this function is doing

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also add a quick comment on what this function is doing

renamed this method and added a comment

ConfiguredAirbyteCatalog catalog,
Database database) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can remove passing in the Database parameter. It's not being used

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can remove passing in the Database parameter. It's not being used

removed

for (final ConfiguredAirbyteStream airbyteStream : catalog.getStreams()) {
final AirbyteStream stream = airbyteStream.getStream();
final String fullyQualifiedTableName = getFullyQualifiedTableName(stream.getNamespace(),
stream.getName());
if (!fullyQualifiedTableNameToInfo.containsKey(fullyQualifiedTableName)) {
continue;
}
final TableInfo<CommonField<DataType>> table = fullyQualifiedTableNameToInfo.get(fullyQualifiedTableName);
final List<Field> fields = table.getFields()
.stream()
.map(this::toField)
.distinct()
.collect(Collectors.toList());
final JsonNode currentJsonSchema = fieldsToJsonSchema(fields);

final JsonNode catalogSchema = stream.getJsonSchema();
if (!catalogSchema.equals(currentJsonSchema)) {
LOGGER.warn(
"The underlying schema changed for the table. Please refresh your source schema! Source schema changed for table {}! Actual schema: {}. Catalog schema: {}",
fullyQualifiedTableName,
currentJsonSchema,
catalogSchema);
}
}
}

private void validateCursorFieldForIncrementalTables(
final Map<String, TableInfo<CommonField<DataType>>> tableNameToTable,
final ConfiguredAirbyteCatalog catalog,
Expand Down