Skip to content

Commit 9ae6e7f

Browse files
committed
Add order by clause in incremental query
1 parent 8aa9ab4 commit 9ae6e7f

File tree

5 files changed

+17
-22
lines changed

5 files changed

+17
-22
lines changed

airbyte-integrations/connectors/source-bigquery/src/main/java/io/airbyte/integrations/source/bigquery/BigQuerySource.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -138,12 +138,12 @@ public AutoCloseableIterator<JsonNode> queryTableIncremental(final BigQueryDatab
138138
final String tableName,
139139
final String cursorField,
140140
final StandardSQLTypeName cursorFieldType,
141-
final String cursor) {
141+
final String cursorValue) {
142142
return queryTableWithParams(database, String.format("SELECT %s FROM %s WHERE %s > ?",
143143
enquoteIdentifierList(columnNames),
144144
getFullTableName(schemaName, tableName),
145145
cursorField),
146-
sourceOperations.getQueryParameter(cursorFieldType, cursor));
146+
sourceOperations.getQueryParameter(cursorFieldType, cursorValue));
147147
}
148148

149149
private AutoCloseableIterator<JsonNode> queryTableWithParams(final BigQueryDatabase database,

airbyte-integrations/connectors/source-jdbc/src/main/java/io/airbyte/integrations/source/jdbc/AbstractJdbcSource.java

+7-4
Original file line numberDiff line numberDiff line change
@@ -266,21 +266,24 @@ public AutoCloseableIterator<JsonNode> queryTableIncremental(final JdbcDatabase
266266
final String tableName,
267267
final String cursorField,
268268
final Datatype cursorFieldType,
269-
final String cursor) {
269+
final String cursorValue) {
270270
LOGGER.info("Queueing query for table: {}", tableName);
271271
return AutoCloseableIterators.lazyIterator(() -> {
272272
try {
273273
final Stream<JsonNode> stream = database.unsafeQuery(
274274
connection -> {
275275
LOGGER.info("Preparing query for table: {}", tableName);
276-
final String sql = String.format("SELECT %s FROM %s WHERE %s > ?",
276+
final String quotedCursorField = sourceOperations.enquoteIdentifier(connection, cursorField);
277+
final String sql = String.format("SELECT %s FROM %s WHERE %s > ? ORDER BY %s ASC",
277278
sourceOperations.enquoteIdentifierList(connection, columnNames),
278279
sourceOperations
279280
.getFullyQualifiedTableNameWithQuoting(connection, schemaName, tableName),
280-
sourceOperations.enquoteIdentifier(connection, cursorField));
281+
quotedCursorField,
282+
quotedCursorField
283+
);
281284

282285
final PreparedStatement preparedStatement = connection.prepareStatement(sql);
283-
sourceOperations.setStatementField(preparedStatement, 1, cursorFieldType, cursor);
286+
sourceOperations.setStatementField(preparedStatement, 1, cursorFieldType, cursorValue);
284287
LOGGER.info("Executing query for table: {}", tableName);
285288
return preparedStatement;
286289
},

airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io.airbyte.integrations.source.mongodb/MongoDbSource.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -186,8 +186,8 @@ public AutoCloseableIterator<JsonNode> queryTableIncremental(final MongoDatabase
186186
final String tableName,
187187
final String cursorField,
188188
final BsonType cursorFieldType,
189-
final String cursor) {
190-
final Bson greaterComparison = gt(cursorField, MongoUtils.getBsonValue(cursorFieldType, cursor));
189+
final String cursorValue) {
190+
final Bson greaterComparison = gt(cursorField, MongoUtils.getBsonValue(cursorFieldType, cursorValue));
191191
return queryTable(database, columnNames, tableName, greaterComparison);
192192
}
193193

airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlSource.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ public AutoCloseableIterator<JsonNode> queryTableIncremental(final JdbcDatabase
9797
final String tableName,
9898
final String cursorField,
9999
final JDBCType cursorFieldType,
100-
final String cursor) {
100+
final String cursorValue) {
101101
LOGGER.info("Queueing query for table: {}", tableName);
102102
return AutoCloseableIterators.lazyIterator(() -> {
103103
try {
@@ -115,7 +115,7 @@ public AutoCloseableIterator<JsonNode> queryTableIncremental(final JdbcDatabase
115115
LOGGER.info("Prepared SQL query for queryTableIncremental is: " + sql);
116116

117117
final PreparedStatement preparedStatement = connection.prepareStatement(sql);
118-
sourceOperations.setStatementField(preparedStatement, 1, cursorFieldType, cursor);
118+
sourceOperations.setStatementField(preparedStatement, 1, cursorFieldType, cursorValue);
119119
LOGGER.info("Executing query for table: {}", tableName);
120120
return preparedStatement;
121121
},
@@ -279,8 +279,8 @@ protected void assertCdcSchemaQueryable(final JsonNode config, final JdbcDatabas
279279
final List<JsonNode> queryResponse = database.queryJsons(connection -> {
280280
boolean isAzureSQL = false;
281281

282-
try (Statement stmt = connection.createStatement();
283-
ResultSet editionRS = stmt.executeQuery("SELECT ServerProperty('Edition')")) {
282+
try (final Statement stmt = connection.createStatement();
283+
final ResultSet editionRS = stmt.executeQuery("SELECT ServerProperty('Edition')")) {
284284
isAzureSQL = editionRS.next() && "SQL Azure".equals(editionRS.getString(1));
285285
}
286286

airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/AbstractDbSource.java

+2-10
Original file line numberDiff line numberDiff line change
@@ -239,8 +239,7 @@ protected AutoCloseableIterator<AirbyteMessage> createReadIterator(final Databas
239239
airbyteMessageIterator = getFullRefreshStream(database, streamName, namespace, selectedDatabaseFields, table, emittedAt);
240240
}
241241

242-
final JsonSchemaPrimitive cursorType = IncrementalUtils
243-
.getCursorType(airbyteStream, cursorField);
242+
final JsonSchemaPrimitive cursorType = IncrementalUtils.getCursorType(airbyteStream, cursorField);
244243

245244
iterator = AutoCloseableIterators.transform(autoCloseableIterator -> new StateDecoratingIterator(
246245
autoCloseableIterator,
@@ -494,13 +493,6 @@ public abstract AutoCloseableIterator<JsonNode> queryTableFullRefresh(final Data
494493
* Read incremental data from a table. Incremental read should returns only records where cursor
495494
* column value is bigger than cursor.
496495
*
497-
* @param database source database
498-
* @param columnNames interested column names
499-
* @param schemaName table namespace
500-
* @param tableName target table
501-
* @param cursorField cursor field name
502-
* @param cursorFieldType cursor field type
503-
* @param cursor cursor value
504496
* @return iterator with read data
505497
*/
506498
public abstract AutoCloseableIterator<JsonNode> queryTableIncremental(Database database,
@@ -509,7 +501,7 @@ public abstract AutoCloseableIterator<JsonNode> queryTableIncremental(Database d
509501
String tableName,
510502
String cursorField,
511503
DataType cursorFieldType,
512-
String cursor);
504+
String cursorValue);
513505

514506
private Database createDatabaseInternal(final JsonNode sourceConfig) throws Exception {
515507
final Database database = createDatabase(sourceConfig);

0 commit comments

Comments
 (0)