-
Notifications
You must be signed in to change notification settings - Fork 4.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Resumable Full Refresh sync for mssql #37451
Conversation
The latest updates on your projects. Learn more about Vercel for Git ↗︎ 1 Ignored Deployment
|
@@ -39,7 +41,7 @@ import org.mockito.Mockito | |||
"The static variables are updated in subclasses for convenience, and cannot be final." | |||
) | |||
abstract class JdbcSourceAcceptanceTest<S : Source, T : TestDatabase<*, T, *>> { | |||
@JvmField protected var testdb: T = createTestDatabase() | |||
@JvmField protected var testdb: T? = null |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note: This line created a new test db every time an instance was created, leaving it open, while test themselves opened another test db in setup.
This left test db's that were never torn down
@@ -281,6 +282,7 @@ public void close() { | |||
bgThread.stop = true; | |||
} | |||
super.close(); | |||
MssqlDebeziumStateUtil.disposeInitialState(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note: Because concurrent mssql tests are reusing threads in a fixed thread pool, it is necessary to dispose of cached state so the next test using this thread will start fresh.
Cached initial state is tied to a thread (ThreadLocal<>
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pretty clean! I'm halfway through the PR and will finish later.
? new CdcState().withState(initialDebeziumState) | ||
: stateManager.getCdcStateManager().getCdcState(); | ||
cdcStreamsForInitialOrderedColumnLoad(stateManager.getCdcStateManager(), catalog, savedOffsetStillPresentOnServer); | ||
final CdcState stateToBeUsed = getCdcState(database, catalog, stateManager, savedOffsetStillPresentOnServer); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should rename this to initialState or initialStateToBeUsed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
@@ -227,9 +221,11 @@ private AutoCloseableIterator<AirbyteMessage> augmentWithState(final AutoCloseab | |||
final Long syncCheckpointRecords = config.get(SYNC_CHECKPOINT_RECORDS_PROPERTY) != null ? config.get(SYNC_CHECKPOINT_RECORDS_PROPERTY).asLong() | |||
: DebeziumIteratorConstants.SYNC_CHECKPOINT_RECORDS; | |||
|
|||
initialLoadStateManager.setStreamStateForIncrementalRunSupplier(streamStateForIncrementalRunSupplier); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I realized this will be a bug, I reproduced it on mysql and postgres. I think mssql is fine because we set streamStateForIncrementalRunSupplier the same for both full refresh and incremental refresh - so just FYI
Since we share initialLoadStateManager, and we setup the iterators before actually iterating through them, the full refresh setup will overwrite the ones used in incremental iterators.
In postgres and mysql my solution was to provide a optional streamStateForIncrementalRunSupplier and in state manager it will default to emptyObject. So I think we achieved the same thing but just with different approach.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's take this offline. I'd like to understand how mssql is different.
@@ -23,7 +23,8 @@ | |||
import org.junit.jupiter.api.parallel.ExecutionMode; | |||
|
|||
@TestInstance(Lifecycle.PER_METHOD) | |||
@Execution(ExecutionMode.CONCURRENT) | |||
@Execution(ExecutionMode.SAME_THREAD) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this still required?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oops. forgot to change back. thanks
docs/integrations/sources/mssql.md
Outdated
|:--------|:-----------| :---------------------------------------------------------------------------------------------------------------- |:------------------------------------------------------------------------------------------------------------------------------------------------| | ||
| 4.0.15 | 2024-04-22 | [37541](https://github.com/airbytehq/airbyte/pull/37541) | Adopt latest CDK. reduce excessive logs. | | ||
|:--------|:-----------|:------------------------------------------------------------------------------------------------------------------|:------------------------------------------------------------------------------------------------------------------------------------------------| | ||
| 4.0.16 | 2024-04-30 | [37451](https://github.com/airbytehq/airbyte/pull/37451) | Resumable full refresh read of tables. | |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
note: this needs to be a minor update! and also we need to figure out how to highlight the minimum platform requirement in this doc.
this.pairToOrderedColInfo = pairToOrderedColInfo; | ||
this.pairToOrderedColLoadStatus = MssqlInitialLoadStateManager.initPairToOrderedColumnLoadStatusMap(initialLoadStreams.pairToInitialLoadStatus()); | ||
} | ||
|
||
public MssqlInitialLoadStreamStateManager(final ConfiguredAirbyteCatalog catalog, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can delete this now
throw new RuntimeException(String.format("Stream %s was not provided with an appropriate cursor", stream.getStream().getName())); | ||
} | ||
final CursorBasedStatus cursorBasedStatus = new CursorBasedStatus(); | ||
final Optional<String> maybeCursorField = Optional.ofNullable(cursorInfoOptional.get().getCursorField()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@stephane-airbyte actually this change by @rodireich should have fixed the issue that we were discussing here
Noticing the use of Optional
and ifPresent
function below which were not checked in the previous version. So I will close that issue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes. This is indeed fixing it. Thank you
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for the discussion that led to this investigation!
The implementation for Mssql references MySql's implementation: #36932