Skip to content

Commit b957dfa

Browse files
JDBC Sources: validate actual source schema (airbytehq#21844)
* JDBC Sources: validate actual source schema * add unit test * updated test cases * refactoring
1 parent cab9a4f commit b957dfa

File tree

1 file changed

+33
-0
lines changed
  • airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb

1 file changed

+33
-0
lines changed

airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/AbstractDbSource.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
package io.airbyte.integrations.source.relationaldb;
66

77
import static io.airbyte.integrations.base.errors.messages.ErrorMessage.getErrorMessage;
8+
import static io.airbyte.protocol.models.v0.CatalogHelpers.fieldsToJsonSchema;
89

910
import com.fasterxml.jackson.databind.JsonNode;
1011
import com.google.common.base.Preconditions;
@@ -161,6 +162,8 @@ public AutoCloseableIterator<AirbyteMessage> read(final JsonNode config,
161162

162163
validateCursorFieldForIncrementalTables(fullyQualifiedTableNameToInfo, catalog, database);
163164

165+
logSourceSchemaChange(fullyQualifiedTableNameToInfo, catalog);
166+
164167
final List<AutoCloseableIterator<AirbyteMessage>> incrementalIterators =
165168
getIncrementalIterators(database, catalog, fullyQualifiedTableNameToInfo, stateManager,
166169
emittedAt);
@@ -180,6 +183,36 @@ public AutoCloseableIterator<AirbyteMessage> read(final JsonNode config,
180183
});
181184
}
182185

186+
// in case of user manually modified source table schema but did not refresh it and save into the
187+
// catalog - it can lead to sync failure. This method compare actual schema vs catalog schema
188+
private void logSourceSchemaChange(Map<String, TableInfo<CommonField<DataType>>> fullyQualifiedTableNameToInfo,
189+
ConfiguredAirbyteCatalog catalog) {
190+
for (final ConfiguredAirbyteStream airbyteStream : catalog.getStreams()) {
191+
final AirbyteStream stream = airbyteStream.getStream();
192+
final String fullyQualifiedTableName = getFullyQualifiedTableName(stream.getNamespace(),
193+
stream.getName());
194+
if (!fullyQualifiedTableNameToInfo.containsKey(fullyQualifiedTableName)) {
195+
continue;
196+
}
197+
final TableInfo<CommonField<DataType>> table = fullyQualifiedTableNameToInfo.get(fullyQualifiedTableName);
198+
final List<Field> fields = table.getFields()
199+
.stream()
200+
.map(this::toField)
201+
.distinct()
202+
.collect(Collectors.toList());
203+
final JsonNode currentJsonSchema = fieldsToJsonSchema(fields);
204+
205+
final JsonNode catalogSchema = stream.getJsonSchema();
206+
if (!catalogSchema.equals(currentJsonSchema)) {
207+
LOGGER.warn(
208+
"Source schema changed for table {}! Actual schema: {}. Catalog schema: {}",
209+
fullyQualifiedTableName,
210+
currentJsonSchema,
211+
catalogSchema);
212+
}
213+
}
214+
}
215+
183216
private void validateCursorFieldForIncrementalTables(
184217
final Map<String, TableInfo<CommonField<DataType>>> tableNameToTable,
185218
final ConfiguredAirbyteCatalog catalog,

0 commit comments

Comments
 (0)