Skip to content

Commit 03584d5

Browse files
authored
[source-mysql/mssql] Fix state manager on determining non-resumable streams (#45181)
1 parent b6825ee commit 03584d5

File tree

6 files changed

+40
-11
lines changed

6 files changed

+40
-11
lines changed

airbyte-integrations/connectors/source-mssql/metadata.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ data:
99
connectorSubtype: database
1010
connectorType: source
1111
definitionId: b5ea17b1-f170-46dc-bc31-cc744ca984c1
12-
dockerImageTag: 4.1.12
12+
dockerImageTag: 4.1.13
1313
dockerRepository: airbyte/source-mssql
1414
documentationUrl: https://docs.airbyte.com/integrations/sources/mssql
1515
githubIssueLabel: source-mssql

airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/initialsync/MssqlInitialLoadGlobalStateManager.java

+16-4
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ public class MssqlInitialLoadGlobalStateManager extends MssqlInitialLoadStateMan
3030
// No special handling for resumable full refresh streams. We will report the cursor as it is.
3131
private Set<AirbyteStreamNameNamespacePair> resumableFullRefreshStreams;
3232
private Set<AirbyteStreamNameNamespacePair> nonResumableFullRefreshStreams;
33+
private Set<AirbyteStreamNameNamespacePair> completedNonResumableFullRefreshStreams;
3334

3435
public MssqlInitialLoadGlobalStateManager(final InitialLoadStreams initialLoadStreams,
3536
final Map<AirbyteStreamNameNamespacePair, OrderedColumnInfo> pairToOrderedColInfo,
@@ -61,6 +62,7 @@ private void initStreams(final InitialLoadStreams initialLoadStreams,
6162
this.streamsThatHaveCompletedSnapshot = new HashSet<>();
6263
this.resumableFullRefreshStreams = new HashSet<>();
6364
this.nonResumableFullRefreshStreams = new HashSet<>();
65+
this.completedNonResumableFullRefreshStreams = new HashSet<>();
6466

6567
catalog.getStreams().forEach(configuredAirbyteStream -> {
6668
var pairInStream =
@@ -70,7 +72,8 @@ private void initStreams(final InitialLoadStreams initialLoadStreams,
7072
this.streamsThatHaveCompletedSnapshot.add(pairInStream);
7173
}
7274
if (configuredAirbyteStream.getSyncMode() == SyncMode.FULL_REFRESH) {
73-
if (initialLoadStreams.streamsForInitialLoad().contains(configuredAirbyteStream)) {
75+
if (configuredAirbyteStream.getStream().getSourceDefinedPrimaryKey() != null
76+
&& !configuredAirbyteStream.getStream().getSourceDefinedPrimaryKey().isEmpty()) {
7477
this.resumableFullRefreshStreams.add(pairInStream);
7578
} else {
7679
this.nonResumableFullRefreshStreams.add(pairInStream);
@@ -94,6 +97,12 @@ public AirbyteStateMessage generateStateMessageAtCheckpoint(final ConfiguredAirb
9497
}
9598
});
9699

100+
completedNonResumableFullRefreshStreams.forEach(stream -> {
101+
streamStates.add(new AirbyteStreamState()
102+
.withStreamDescriptor(
103+
new StreamDescriptor().withName(stream.getName()).withNamespace(stream.getNamespace())));
104+
});
105+
97106
if (airbyteStream.getSyncMode() == SyncMode.INCREMENTAL) {
98107
AirbyteStreamNameNamespacePair pair =
99108
new AirbyteStreamNameNamespacePair(airbyteStream.getStream().getName(), airbyteStream.getStream().getNamespace());
@@ -119,10 +128,13 @@ private AirbyteStreamState getAirbyteStreamState(final AirbyteStreamNameNamespac
119128

120129
@Override
121130
public AirbyteStateMessage createFinalStateMessage(final ConfiguredAirbyteStream airbyteStream) {
131+
132+
final io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair pair = new io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair(
133+
airbyteStream.getStream().getName(), airbyteStream.getStream().getNamespace());
122134
if (airbyteStream.getSyncMode() == SyncMode.INCREMENTAL) {
123-
io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair pair = new io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair(
124-
airbyteStream.getStream().getName(), airbyteStream.getStream().getNamespace());
125135
streamsThatHaveCompletedSnapshot.add(pair);
136+
} else if (nonResumableFullRefreshStreams.contains(pair)) {
137+
completedNonResumableFullRefreshStreams.add(pair);
126138
}
127139
final List<AirbyteStreamState> streamStates = new ArrayList<>();
128140
streamsThatHaveCompletedSnapshot.forEach(stream -> {
@@ -135,7 +147,7 @@ public AirbyteStateMessage createFinalStateMessage(final ConfiguredAirbyteStream
135147
streamStates.add(getAirbyteStreamState(stream, Jsons.jsonNode(ocStatus)));
136148
});
137149

138-
nonResumableFullRefreshStreams.forEach(stream -> {
150+
completedNonResumableFullRefreshStreams.forEach(stream -> {
139151
streamStates.add(new AirbyteStreamState()
140152
.withStreamDescriptor(
141153
new StreamDescriptor().withName(stream.getName()).withNamespace(stream.getNamespace())));

airbyte-integrations/connectors/source-mysql/metadata.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ data:
99
connectorSubtype: database
1010
connectorType: source
1111
definitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad
12-
dockerImageTag: 3.7.1
12+
dockerImageTag: 3.7.2
1313
dockerRepository: airbyte/source-mysql
1414
documentationUrl: https://docs.airbyte.com/integrations/sources/mysql
1515
githubIssueLabel: source-mysql

airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/initialsync/MySqlInitialLoadGlobalStateManager.java

+19-4
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,12 @@
2727
import java.util.Map;
2828
import java.util.Objects;
2929
import java.util.Set;
30+
import org.slf4j.Logger;
31+
import org.slf4j.LoggerFactory;
3032

3133
public class MySqlInitialLoadGlobalStateManager extends MySqlInitialLoadStateManager {
3234

35+
private static final Logger LOGGER = LoggerFactory.getLogger(MySqlInitialLoadGlobalStateManager.class);
3336
protected StateManager stateManager;
3437

3538
// Only one global state is emitted, which is fanned out into many entries in the DB by platform. As
@@ -42,6 +45,7 @@ public class MySqlInitialLoadGlobalStateManager extends MySqlInitialLoadStateMan
4245

4346
// non ResumableFullRefreshStreams do not have any state. We only report count for them.
4447
private Set<AirbyteStreamNameNamespacePair> nonResumableFullRefreshStreams;
48+
private Set<AirbyteStreamNameNamespacePair> completedNonResumableFullRefreshStreams;
4549

4650
private final boolean savedOffsetStillPresentOnServer;
4751
private final ConfiguredAirbyteCatalog catalog;
@@ -69,6 +73,7 @@ private void initStreams(final InitialLoadStreams initialLoadStreams,
6973
this.streamsThatHaveCompletedSnapshot = new HashSet<>();
7074
this.resumableFullRefreshStreams = new HashSet<>();
7175
this.nonResumableFullRefreshStreams = new HashSet<>();
76+
this.completedNonResumableFullRefreshStreams = new HashSet<>();
7277

7378
catalog.getStreams().forEach(configuredAirbyteStream -> {
7479
var pairInStream =
@@ -78,7 +83,8 @@ private void initStreams(final InitialLoadStreams initialLoadStreams,
7883
this.streamsThatHaveCompletedSnapshot.add(pairInStream);
7984
}
8085
if (configuredAirbyteStream.getSyncMode() == SyncMode.FULL_REFRESH) {
81-
if (initialLoadStreams.streamsForInitialLoad().contains(configuredAirbyteStream)) {
86+
if (configuredAirbyteStream.getStream().getSourceDefinedPrimaryKey() != null
87+
&& !configuredAirbyteStream.getStream().getSourceDefinedPrimaryKey().isEmpty()) {
8288
this.resumableFullRefreshStreams.add(pairInStream);
8389
} else {
8490
this.nonResumableFullRefreshStreams.add(pairInStream);
@@ -115,6 +121,13 @@ public AirbyteStateMessage generateStateMessageAtCheckpoint(final ConfiguredAirb
115121
streamStates.add(getAirbyteStreamState(stream, (Jsons.jsonNode(pkStatus))));
116122
}
117123
});
124+
125+
completedNonResumableFullRefreshStreams.forEach(stream -> {
126+
streamStates.add(new AirbyteStreamState()
127+
.withStreamDescriptor(
128+
new StreamDescriptor().withName(stream.getName()).withNamespace(stream.getNamespace())));
129+
});
130+
118131
if (airbyteStream.getSyncMode() == SyncMode.INCREMENTAL) {
119132
AirbyteStreamNameNamespacePair pair =
120133
new AirbyteStreamNameNamespacePair(airbyteStream.getStream().getName(), airbyteStream.getStream().getNamespace());
@@ -129,10 +142,12 @@ public AirbyteStateMessage generateStateMessageAtCheckpoint(final ConfiguredAirb
129142

130143
@Override
131144
public AirbyteStateMessage createFinalStateMessage(final ConfiguredAirbyteStream airbyteStream) {
145+
final AirbyteStreamNameNamespacePair pair =
146+
new AirbyteStreamNameNamespacePair(airbyteStream.getStream().getName(), airbyteStream.getStream().getNamespace());
132147
if (airbyteStream.getSyncMode() == SyncMode.INCREMENTAL) {
133-
AirbyteStreamNameNamespacePair pair =
134-
new AirbyteStreamNameNamespacePair(airbyteStream.getStream().getName(), airbyteStream.getStream().getNamespace());
135148
streamsThatHaveCompletedSnapshot.add(pair);
149+
} else if (nonResumableFullRefreshStreams.contains(pair)) {
150+
completedNonResumableFullRefreshStreams.add(pair);
136151
}
137152
final List<AirbyteStreamState> streamStates = new ArrayList<>();
138153

@@ -146,7 +161,7 @@ public AirbyteStateMessage createFinalStateMessage(final ConfiguredAirbyteStream
146161
streamStates.add(getAirbyteStreamState(stream, (Jsons.jsonNode(pkStatus))));
147162
});
148163

149-
nonResumableFullRefreshStreams.forEach(stream -> {
164+
completedNonResumableFullRefreshStreams.forEach(stream -> {
150165
streamStates.add(new AirbyteStreamState()
151166
.withStreamDescriptor(
152167
new StreamDescriptor().withName(stream.getName()).withNamespace(stream.getNamespace())));

docs/integrations/sources/mssql.md

+1
Original file line numberDiff line numberDiff line change
@@ -422,6 +422,7 @@ WHERE actor_definition_id ='b5ea17b1-f170-46dc-bc31-cc744ca984c1' AND (configura
422422

423423
| Version | Date | Pull Request | Subject |
424424
|:--------|:-----------|:------------------------------------------------------------------------------------------------------------------|:------------------------------------------------------------------------------------------------------------------------------------------------|
425+
| 4.1.13 | 2024-09-05 | [45181](https://github.com/airbytehq/airbyte/pull/45181) | Fix incorrect categorizing resumable/nonresumable full refresh streams. |
425426
| 4.1.12 | 2024-09-10 | [45368](https://github.com/airbytehq/airbyte/pull/45368) | Remove excessive debezium logging. |
426427
| 4.1.11 | 2024-09-04 | [45142](https://github.com/airbytehq/airbyte/pull/45142) | Fix incorrect datetimeoffset format in cursor state. |
427428
| 4.1.10 | 2024-08-27 | [44759](https://github.com/airbytehq/airbyte/pull/44759) | Improve null safety in parsing debezium change events. |

docs/integrations/sources/mysql.md

+2-1
Original file line numberDiff line numberDiff line change
@@ -233,7 +233,8 @@ Any database or table encoding combination of charset and collation is supported
233233

234234
| Version | Date | Pull Request | Subject |
235235
|:--------|:-----------|:-----------------------------------------------------------|:------------------------------------------------------------------------------------------------------------------------------------------------|
236-
| 3.7.1 | 2024-08-27 | [44841](https://github.com/airbytehq/airbyte/pull/44841) | Adopt latest CDK. |
236+
| 3.7.2 | 2024-09-05 | [45181](https://github.com/airbytehq/airbyte/pull/45181) | Fix incorrect categorizing resumable/nonresumable full refresh streams. |
237+
| 3.7.1 | 2024-08-27 | [44841](https://github.com/airbytehq/airbyte/pull/44841) | Adopt latest CDK. |
237238
| 3.7.0 | 2024-08-13 | [44013](https://github.com/airbytehq/airbyte/pull/44013) | Upgrading to Debezium 2.7.1.Final |
238239
| 3.6.9 | 2024-08-08 | [43410](https://github.com/airbytehq/airbyte/pull/43410) | Adopt latest CDK. |
239240
| 3.6.8 | 2024-07-30 | [42869](https://github.com/airbytehq/airbyte/pull/42869) | Adopt latest CDK. |

0 commit comments

Comments
 (0)