-
Notifications
You must be signed in to change notification settings - Fork 4.5k
Resumable Full Refresh sync for mssql #37451
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
000d09e
701f879
6961a94
3d7e013
41848ed
3907d9d
9164bfd
92b71b6
651f088
8183126
566e344
c81fa4c
ff6ce90
908d1da
760777a
653c19b
c469c99
4ffe6b2
37fd000
09488ab
4d97b2d
eee5db6
6bd2b75
12997ec
137dd86
1d60fe6
f98f02f
470e432
8f118ad
90b9ca8
ed4e7fe
7cca19e
0d8d3f8
266ff96
9a118ff
34f4d47
da2fbe1
7e2e66e
730b64c
ccf2a3c
f22f33c
cb24220
886c348
d1800e5
6e9299b
05f470e
e78b56e
984c8a4
18843f2
3f04061
d325718
b25a0d6
22aab75
4462470
f3b887d
39f4b2d
ca257d1
afbb193
b94f8e1
2868a08
0e4ccb5
3a941b8
b337bd4
9fefb7a
f9ba78c
468972b
dd61263
7e8a773
8b22e62
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,7 +20,7 @@ | |
import io.airbyte.cdk.integrations.source.relationaldb.models.InternalModels.StateType; | ||
import io.airbyte.cdk.integrations.source.relationaldb.state.StateManager; | ||
import io.airbyte.commons.json.Jsons; | ||
import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair; | ||
import io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair; | ||
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog; | ||
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream; | ||
import java.math.BigDecimal; | ||
|
@@ -185,46 +185,46 @@ public static Map<io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair, | |
|
||
final Map<io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair, CursorBasedStatus> cursorBasedStatusMap = new HashMap<>(); | ||
streams.forEach(stream -> { | ||
try { | ||
final String name = stream.getStream().getName(); | ||
final String namespace = stream.getStream().getNamespace(); | ||
final String fullTableName = | ||
getFullyQualifiedTableNameWithQuoting(namespace, name, quoteString); | ||
|
||
final Optional<CursorInfo> cursorInfoOptional = | ||
stateManager.getCursorInfo(new io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair(name, namespace)); | ||
if (cursorInfoOptional.isEmpty()) { | ||
throw new RuntimeException(String.format("Stream %s was not provided with an appropriate cursor", stream.getStream().getName())); | ||
} | ||
final String name = stream.getStream().getName(); | ||
final String namespace = stream.getStream().getNamespace(); | ||
final String fullTableName = | ||
getFullyQualifiedTableNameWithQuoting(namespace, name, quoteString); | ||
|
||
LOGGER.info("Querying max cursor value for {}.{}", namespace, name); | ||
final String cursorField = cursorInfoOptional.get().getCursorField(); | ||
final Optional<CursorInfo> cursorInfoOptional = | ||
stateManager.getCursorInfo(new AirbyteStreamNameNamespacePair(name, namespace)); | ||
if (cursorInfoOptional.isEmpty()) { | ||
throw new RuntimeException(String.format("Stream %s was not provided with an appropriate cursor", stream.getStream().getName())); | ||
} | ||
final CursorBasedStatus cursorBasedStatus = new CursorBasedStatus(); | ||
final Optional<String> maybeCursorField = Optional.ofNullable(cursorInfoOptional.get().getCursorField()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @stephane-airbyte actually this change by @rodireich should have fixed the issue that we were discussing here Noticing the use of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes. This is indeed fixing it. Thank you There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thank you for the discussion that led to this investigation! |
||
maybeCursorField.ifPresent(cursorField -> { | ||
LOGGER.info("Cursor {}. Querying max cursor value for {}.{}", cursorField, namespace, name); | ||
final String quotedCursorField = getIdentifierWithQuoting(cursorField, quoteString); | ||
final String cursorBasedSyncStatusQuery = String.format(MAX_CURSOR_VALUE_QUERY, | ||
quotedCursorField, | ||
fullTableName, | ||
quotedCursorField, | ||
quotedCursorField, | ||
fullTableName); | ||
final List<JsonNode> jsonNodes = database.bufferedResultSetQuery(conn -> conn.prepareStatement(cursorBasedSyncStatusQuery).executeQuery(), | ||
resultSet -> JdbcUtils.getDefaultSourceOperations().rowToJson(resultSet)); | ||
final CursorBasedStatus cursorBasedStatus = new CursorBasedStatus(); | ||
cursorBasedStatus.setStateType(StateType.CURSOR_BASED); | ||
cursorBasedStatus.setVersion(2L); | ||
cursorBasedStatus.setStreamName(name); | ||
cursorBasedStatus.setStreamNamespace(namespace); | ||
final List<JsonNode> jsonNodes; | ||
try { | ||
jsonNodes = database.bufferedResultSetQuery(conn -> conn.prepareStatement(cursorBasedSyncStatusQuery).executeQuery(), | ||
resultSet -> JdbcUtils.getDefaultSourceOperations().rowToJson(resultSet)); | ||
} catch (SQLException e) { | ||
throw new RuntimeException("Failed to read max cursor value from %s.%s".formatted(namespace, name), e); | ||
} | ||
cursorBasedStatus.setCursorField(ImmutableList.of(cursorField)); | ||
|
||
if (!jsonNodes.isEmpty()) { | ||
final JsonNode result = jsonNodes.get(0); | ||
cursorBasedStatus.setCursor(result.get(cursorField).asText()); | ||
cursorBasedStatus.setCursorRecordCount((long) jsonNodes.size()); | ||
} | ||
|
||
cursorBasedStatusMap.put(new io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair(name, namespace), cursorBasedStatus); | ||
} catch (final SQLException e) { | ||
throw new RuntimeException(e); | ||
} | ||
cursorBasedStatus.setStateType(StateType.CURSOR_BASED); | ||
cursorBasedStatus.setVersion(2L); | ||
cursorBasedStatus.setStreamName(name); | ||
cursorBasedStatus.setStreamNamespace(namespace); | ||
cursorBasedStatusMap.put(new AirbyteStreamNameNamespacePair(name, namespace), cursorBasedStatus); | ||
}); | ||
}); | ||
|
||
return cursorBasedStatusMap; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note: This line created a new test db every time an instance was created, leaving it open, while test themselves opened another test db in setup.
This left test db's that were never torn down