Skip to content

Commit f5d0c23

Browse files
subodh1810jhammarstedt
authored andcommitted
improve error message for tables with invalid columns as cursor (airbytehq#15317)
* implement validation for cursor type before reading data * rename class * add test * fix merge conflicts * address review comments * fix test
1 parent 904f096 commit f5d0c23

File tree

5 files changed

+127
-0
lines changed

5 files changed

+127
-0
lines changed

airbyte-db/db-lib/src/main/java/io/airbyte/db/IncrementalUtils.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
88
import io.airbyte.protocol.models.JsonSchemaPrimitive;
9+
import java.util.Optional;
910

1011
public class IncrementalUtils {
1112

@@ -22,6 +23,14 @@ public static String getCursorField(final ConfiguredAirbyteStream stream) {
2223
}
2324
}
2425

26+
public static Optional<String> getCursorFieldOptional(final ConfiguredAirbyteStream stream) {
27+
try {
28+
return Optional.ofNullable(getCursorField(stream));
29+
} catch (IllegalStateException e) {
30+
return Optional.empty();
31+
}
32+
}
33+
2534
public static JsonSchemaPrimitive getCursorType(final ConfiguredAirbyteStream stream, final String cursorField) {
2635
if (stream.getStream().getJsonSchema().get(PROPERTIES) == null) {
2736
throw new IllegalStateException(String.format("No properties found in stream: %s.", stream.getStream().getName()));

airbyte-integrations/connectors/source-jdbc/src/main/java/io/airbyte/integrations/source/jdbc/AbstractJdbcSource.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -371,6 +371,11 @@ protected DataSource createDataSource(final JsonNode config) {
371371
return dataSource;
372372
}
373373

374+
@Override
375+
protected boolean isValidCursorType(final Datatype cursorType) {
376+
return sourceOperations.isCursorType(cursorType);
377+
}
378+
374379
@Override
375380
public JdbcDatabase createDatabase(final JsonNode config) throws SQLException {
376381
final DataSource dataSource = createDataSource(config);

airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresSourceTest.java

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77
import static io.airbyte.integrations.source.postgres.utils.PostgresUnitTestsUtil.createRecord;
88
import static io.airbyte.integrations.source.postgres.utils.PostgresUnitTestsUtil.map;
99
import static io.airbyte.integrations.source.postgres.utils.PostgresUnitTestsUtil.setEmittedAtToNull;
10+
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
11+
import static org.assertj.core.api.AssertionsForClassTypes.catchThrowable;
1012
import static org.junit.jupiter.api.Assertions.assertEquals;
1113
import static org.junit.jupiter.api.Assertions.assertFalse;
1214
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -24,16 +26,20 @@
2426
import io.airbyte.db.factory.DSLContextFactory;
2527
import io.airbyte.db.factory.DatabaseDriver;
2628
import io.airbyte.db.jdbc.JdbcUtils;
29+
import io.airbyte.integrations.source.relationaldb.InvalidCursorException;
2730
import io.airbyte.protocol.models.AirbyteCatalog;
2831
import io.airbyte.protocol.models.AirbyteMessage;
2932
import io.airbyte.protocol.models.AirbyteStream;
3033
import io.airbyte.protocol.models.CatalogHelpers;
3134
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
35+
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
36+
import io.airbyte.protocol.models.DestinationSyncMode;
3237
import io.airbyte.protocol.models.Field;
3338
import io.airbyte.protocol.models.JsonSchemaType;
3439
import io.airbyte.protocol.models.SyncMode;
3540
import io.airbyte.test.utils.PostgreSQLContainerHelper;
3641
import java.math.BigDecimal;
42+
import java.sql.SQLException;
3743
import java.util.Collections;
3844
import java.util.List;
3945
import java.util.Map;
@@ -503,4 +509,45 @@ void testGetUsername() {
503509
assertEquals(username, PostgresSource.getUsername(azureConfig));
504510
}
505511

512+
513+
@Test
514+
public void tableWithInvalidCursorShouldThrowException() throws Exception {
515+
try (final PostgreSQLContainer<?> db = new PostgreSQLContainer<>("postgres:13-alpine")) {
516+
db.start();
517+
final JsonNode config = getConfig(db);
518+
try (final DSLContext dslContext = getDslContext(config)) {
519+
final Database database = new Database(dslContext);
520+
final ConfiguredAirbyteStream tableWithInvalidCursorType = createTableWithInvalidCursorType(database);
521+
final ConfiguredAirbyteCatalog configuredAirbyteCatalog = new ConfiguredAirbyteCatalog().withStreams(Collections.singletonList(tableWithInvalidCursorType));
522+
523+
final Throwable throwable = catchThrowable(() -> MoreIterators.toSet(new PostgresSource().read(config, configuredAirbyteCatalog, null)));
524+
assertThat(throwable).isInstanceOf(InvalidCursorException.class)
525+
.hasMessageContaining(
526+
"The following tables have invalid columns selected as cursor, please select a column with a well-defined ordering as a cursor. {tableName='public.test_table', cursorColumnName='id', cursorSqlType=OTHER}");
527+
} finally {
528+
db.stop();
529+
}
530+
}
531+
}
532+
533+
private ConfiguredAirbyteStream createTableWithInvalidCursorType(final Database database) throws SQLException {
534+
database.query(ctx -> {
535+
ctx.fetch("CREATE EXTENSION IF NOT EXISTS \"uuid-ossp\";");
536+
ctx.fetch("CREATE TABLE IF NOT EXISTS public.test_table(id uuid PRIMARY KEY DEFAULT uuid_generate_v4());");
537+
return null;
538+
});
539+
540+
return new ConfiguredAirbyteStream().withSyncMode(SyncMode.INCREMENTAL)
541+
.withCursorField(Lists.newArrayList("id"))
542+
.withDestinationSyncMode(DestinationSyncMode.APPEND)
543+
.withSyncMode(SyncMode.INCREMENTAL)
544+
.withStream(CatalogHelpers.createAirbyteStream(
545+
"test_table",
546+
"public",
547+
Field.of("id", JsonSchemaType.STRING))
548+
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
549+
.withSourceDefinedPrimaryKey(List.of(List.of("id"))));
550+
551+
}
552+
506553
}

airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/AbstractDbSource.java

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair;
2828
import io.airbyte.integrations.base.AirbyteTraceMessageUtility;
2929
import io.airbyte.integrations.base.Source;
30+
import io.airbyte.integrations.source.relationaldb.InvalidCursorException.InvalidCursorInfo;
3031
import io.airbyte.integrations.source.relationaldb.models.DbState;
3132
import io.airbyte.integrations.source.relationaldb.state.StateManager;
3233
import io.airbyte.integrations.source.relationaldb.state.StateManagerFactory;
@@ -54,6 +55,7 @@
5455
import java.util.Collections;
5556
import java.util.List;
5657
import java.util.Map;
58+
import java.util.Objects;
5759
import java.util.Optional;
5860
import java.util.Set;
5961
import java.util.concurrent.atomic.AtomicLong;
@@ -146,6 +148,8 @@ public AutoCloseableIterator<AirbyteMessage> read(final JsonNode config,
146148
.collect(Collectors.toMap(t -> String.format("%s.%s", t.getNameSpace(), t.getName()), Function
147149
.identity()));
148150

151+
validateCursorFieldForIncrementalTables(fullyQualifiedTableNameToInfo, catalog);
152+
149153
final List<AutoCloseableIterator<AirbyteMessage>> incrementalIterators =
150154
getIncrementalIterators(database, catalog, fullyQualifiedTableNameToInfo, stateManager, emittedAt);
151155
final List<AutoCloseableIterator<AirbyteMessage>> fullRefreshIterators =
@@ -163,6 +167,42 @@ public AutoCloseableIterator<AirbyteMessage> read(final JsonNode config,
163167
});
164168
}
165169

170+
private void validateCursorFieldForIncrementalTables(final Map<String, TableInfo<CommonField<DataType>>> tableNameToTable, final ConfiguredAirbyteCatalog catalog) {
171+
final List<InvalidCursorInfo> tablesWithInvalidCursor = new ArrayList<>();
172+
for (final ConfiguredAirbyteStream airbyteStream : catalog.getStreams()) {
173+
final AirbyteStream stream = airbyteStream.getStream();
174+
final String fullyQualifiedTableName = getFullyQualifiedTableName(stream.getNamespace(),
175+
stream.getName());
176+
final boolean hasSourceDefinedCursor =
177+
!Objects.isNull(airbyteStream.getStream().getSourceDefinedCursor()) && airbyteStream.getStream().getSourceDefinedCursor();
178+
if (!tableNameToTable.containsKey(fullyQualifiedTableName) || airbyteStream.getSyncMode() != SyncMode.INCREMENTAL || hasSourceDefinedCursor) {
179+
continue;
180+
}
181+
182+
final TableInfo<CommonField<DataType>> table = tableNameToTable
183+
.get(fullyQualifiedTableName);
184+
final Optional<String> cursorField = IncrementalUtils.getCursorFieldOptional(airbyteStream);
185+
if (cursorField.isEmpty()) {
186+
continue;
187+
}
188+
final DataType cursorType = table.getFields().stream()
189+
.filter(info -> info.getName().equals(cursorField.get()))
190+
.map(CommonField::getType)
191+
.findFirst()
192+
.orElseThrow();
193+
194+
if (!isValidCursorType(cursorType)) {
195+
tablesWithInvalidCursor.add(new InvalidCursorInfo(fullyQualifiedTableName, cursorField.get(), cursorType.toString()));
196+
}
197+
}
198+
199+
if (!tablesWithInvalidCursor.isEmpty()) {
200+
throw new InvalidCursorException(tablesWithInvalidCursor);
201+
}
202+
}
203+
204+
protected abstract boolean isValidCursorType(final DataType cursorType);
205+
166206
protected List<TableInfo<CommonField<DataType>>> discoverWithoutSystemTables(final Database database) throws Exception {
167207
final Set<String> systemNameSpaces = getExcludedInternalNameSpaces();
168208
final List<TableInfo<CommonField<DataType>>> discoveredTables = discoverInternal(database);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package io.airbyte.integrations.source.relationaldb;
2+
3+
import java.util.List;
4+
import java.util.stream.Collectors;
5+
6+
public class InvalidCursorException extends RuntimeException {
7+
8+
public InvalidCursorException(final List<InvalidCursorInfo> tablesWithInvalidCursor) {
9+
super("The following tables have invalid columns selected as cursor, please select a column with a well-defined ordering as a cursor. " + tablesWithInvalidCursor.stream().map(InvalidCursorInfo::toString)
10+
.collect(Collectors.joining(",")));
11+
}
12+
13+
public record InvalidCursorInfo(String tableName, String cursorColumnName, String cursorSqlType) {
14+
15+
@Override
16+
public String toString() {
17+
return "{" +
18+
"tableName='" + tableName + '\'' +
19+
", cursorColumnName='" + cursorColumnName + '\'' +
20+
", cursorSqlType=" + cursorSqlType +
21+
'}';
22+
}
23+
}
24+
25+
26+
}

0 commit comments

Comments
 (0)