-
Notifications
You must be signed in to change notification settings - Fork 4.6k
JDBC Sources: validate actual source schema #21844
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 3 commits
eb60a4f
4afd2a1
f147355
b19641f
dc1cfe4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,6 +5,7 @@ | |
package io.airbyte.integrations.source.relationaldb; | ||
|
||
import static io.airbyte.integrations.base.errors.messages.ErrorMessage.getErrorMessage; | ||
import static io.airbyte.protocol.models.v0.CatalogHelpers.fieldsToJsonSchema; | ||
|
||
import com.fasterxml.jackson.databind.JsonNode; | ||
import com.google.common.base.Preconditions; | ||
|
@@ -161,6 +162,8 @@ public AutoCloseableIterator<AirbyteMessage> read(final JsonNode config, | |
|
||
validateCursorFieldForIncrementalTables(fullyQualifiedTableNameToInfo, catalog, database); | ||
|
||
validateSourceSchema(fullyQualifiedTableNameToInfo, catalog, database); | ||
|
||
final List<AutoCloseableIterator<AirbyteMessage>> incrementalIterators = | ||
getIncrementalIterators(database, catalog, fullyQualifiedTableNameToInfo, stateManager, | ||
emittedAt); | ||
|
@@ -180,6 +183,35 @@ public AutoCloseableIterator<AirbyteMessage> read(final JsonNode config, | |
}); | ||
} | ||
|
||
private void validateSourceSchema(Map<String, TableInfo<CommonField<DataType>>> fullyQualifiedTableNameToInfo, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this should be changed to be named validate implies that if it isn't the same you'd be throwing an error There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also add a quick comment on what this function is doing There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
renamed this method and added a comment |
||
ConfiguredAirbyteCatalog catalog, | ||
Database database) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You can remove passing in the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
removed |
||
for (final ConfiguredAirbyteStream airbyteStream : catalog.getStreams()) { | ||
final AirbyteStream stream = airbyteStream.getStream(); | ||
final String fullyQualifiedTableName = getFullyQualifiedTableName(stream.getNamespace(), | ||
stream.getName()); | ||
if (!fullyQualifiedTableNameToInfo.containsKey(fullyQualifiedTableName)) { | ||
akashkulk marked this conversation as resolved.
Show resolved
Hide resolved
|
||
continue; | ||
} | ||
final TableInfo<CommonField<DataType>> table = fullyQualifiedTableNameToInfo.get(fullyQualifiedTableName); | ||
final List<Field> fields = table.getFields() | ||
.stream() | ||
.map(this::toField) | ||
.distinct() | ||
.collect(Collectors.toList()); | ||
final JsonNode currentJsonSchema = fieldsToJsonSchema(fields); | ||
|
||
final JsonNode catalogSchema = stream.getJsonSchema(); | ||
if (!catalogSchema.equals(currentJsonSchema)) { | ||
akashkulk marked this conversation as resolved.
Show resolved
Hide resolved
|
||
LOGGER.warn( | ||
"The underlying schema changed for the table. Please refresh your source schema! Source schema changed for table {}! Actual schema: {}. Catalog schema: {}", | ||
fullyQualifiedTableName, | ||
currentJsonSchema, | ||
catalogSchema); | ||
} | ||
} | ||
} | ||
|
||
private void validateCursorFieldForIncrementalTables( | ||
final Map<String, TableInfo<CommonField<DataType>>> tableNameToTable, | ||
final ConfiguredAirbyteCatalog catalog, | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure I understand what's being tested in this test: IIUC this test is verifying that if the underlying schema is changed, the records read will be different
But won't this always be true? Across two different
reads
, even with without the changes in this PR the read will always return different recordsThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated this test with 4 different reads with the same catalog:
1 read - initial sync (integers)
2 read - nothing changed ==> values are same as in 1 read (integers)
3 read - added 1 record into the source table ==> added 1 airbyte record (integers)
4 read - user changed column definition from int to float ==> values in airbyte records become double
This test fully emulate behaviour from this issue
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But you aren't actually checking for the actual change you're making here, correct? i.e. you aren't verifying the correct logs are being outputted.
In that case, I think you should remove this test altogether since that change is a logging one (and not a validation)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed the test